Commit 1b4f729b authored by confusion's avatar confusion

修改创建基金时 创建默认权限表

parent ed8fbd02
......@@ -13,9 +13,9 @@ from exception.token import FundPermissionError
from model import Response, PageResponse, Page
from model.fund import FundType, StakingFund, NormalFund, FundStatus
from dependencies import get_current_user, get_fund_collect, get_scheduler, get_permission_user_collect, \
get_permission_role_collect
get_permission_role_collect, get_all_permission_collect
from schema.fund import CreateFund, UpdateFund
from service.permission import create_default_role_and_user, check_permission
from service.permission import create_default_role_and_user, check_permission, create_all_sys_permission
from service.scheduler import delete_nav_task, calculate_nav_task
from tools.jwt_tools import User
......@@ -36,7 +36,8 @@ async def create(
scheduler: AsyncIOScheduler = Depends(get_scheduler),
fund_collect: AgnosticCollection = Depends(get_fund_collect),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect),
all_permission_collect: AgnosticCollection = Depends(get_all_permission_collect)
):
create_model = fund_type_map[create_fund.fund_type](**create_fund.dict(), nodes=[], **user.db_save())
create_model.nav = create_model.base_nav
......@@ -45,6 +46,8 @@ async def create(
response_model = fund_type_map[data['fund_type']]
await fund_collect.insert_one(data)
await create_default_role_and_user(data['id'], user.email, permission_user_collect, permission_role_collect)
await create_all_sys_permission(data['id'], all_permission_collect) # 创建默认权限数据
job_id = f"calculate_nav_{data['id']}"
time_obj = datetime.datetime.strptime(data["settlement_time"], "%H:%M")
scheduler.add_job(
......
from motor.core import AgnosticCollection
from dependencies import get_current_user, get_permission_user_collect, get_permission_role_collect, \
get_permission_label_map
get_all_permission_collect
from exception.token import FundPermissionError
from model import BaseResponse, Response
from fastapi import APIRouter, Depends
from schema.permission import CreateUserInfo
from service.permission import check_permission, find_user_permission, build_permission_tree
from service.permission import check_permission, find_user_permission, build_permission_tree, \
find_all_sys_permission
from tools.jwt_tools import User
router = APIRouter()
......@@ -45,7 +46,6 @@ async def create_permission(
async def create_permission(
fund_id: str,
user: User = Depends(get_current_user),
permission_label_map: dict = Depends(get_permission_label_map),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
):
......@@ -55,8 +55,27 @@ async def create_permission(
permission_role_collect=permission_role_collect)
role_db_data = await permission_role_collect.find({'fund_id': fund_id, "name": {"$in": roles}}).to_list(length=None)
result_role_data = [{"name": item["name"], "label": item["label"]} for item in role_db_data]
permissions_tree = build_permission_tree(permissions, permission_label_map)
permissions_tree = build_permission_tree(permissions)
return Response(data={'permission': permissions_tree, "roles": result_role_data})
@router.get('/all/',
response_model=BaseResponse,
summary='查询所有权限',
description='查询所有权限')
async def create_permission(
fund_id: str,
user: User = Depends(get_current_user),
all_permission_collect: AgnosticCollection = Depends(get_all_permission_collect),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
):
assert await check_permission(['role_permission.role.edit'], fund_id=fund_id, email=user.email,
permission_user_collect=permission_user_collect,
permission_role_collect=permission_role_collect), FundPermissionError()
permissions_tree = await find_all_sys_permission(fund_id=fund_id, all_permission_collect=all_permission_collect)
return Response(data={'permission': permissions_tree})
# @router.post('/role',
# response_model=BaseResponse,
# summary='添加角色',
......
from typing import List, Any, Optional
import aioredis
from fastapi import APIRouter, Depends, Query
from motor.core import AgnosticCollection
......@@ -17,7 +18,7 @@ async def get(
symbol: List[str] = Query(..., description='查询的币种'),
query_time: Optional[float] = Query(None, description='查询时间,为空时返回最新报价'),
hour_price_collect: AgnosticCollection = Depends(get_hour_price_collect),
cmc_price_redis: AioRedisManager = Depends(get_cmc_price_redis)
cmc_price_redis: aioredis.Redis = Depends(get_cmc_price_redis)
):
if query_time:
db_client = hour_price_collect
......
import aioredis
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Security, Depends
from fastapi.security import HTTPAuthorizationCredentials
......@@ -22,7 +23,7 @@ def get_permission_label_map(request: Request) -> dict:
def get_current_user(credentials: HTTPAuthorizationCredentials = Security(jwt_tools.security)) -> User:
if settings.env == 'LOCAL':
return User(id=credentials.credentials, email='wangzian@matrixone.io')
return User(id=credentials.credentials, email='ganfangsu@163.com')
return jwt_tools.get_current_user(credentials)
......@@ -73,6 +74,11 @@ def get_permission_role_collect(
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='permission_role')
def get_all_permission_collect(
mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection:
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='sys_all_permission')
# 获取redis Client
def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)) -> AioRedisManager:
def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)) -> aioredis.Redis:
return redis_manager.get_client(name='cmc_price')
......@@ -9,8 +9,8 @@ class MyException(Exception):
status = 400
def __init__(self, message: Optional[str] = None, status: Optional[int] = None):
if not message:
logger.warning(traceback.format_exc())
# if not message:
# logger.warning(traceback.format_exc())
self.message = message or self.message
self.status = status or self.status
......
......@@ -16,7 +16,6 @@ from db import register_mongodb, register_redis
from exception import MyException
from model import ErrorResponse
from service.beacon import BeaconChaService
from service.permission import make_permission_tree_and_label_map
from service.price import CMCPrice
from service.scheduler import update_staking_node_status_task
from tools.jwt_tools import get_identify_key
......@@ -86,7 +85,7 @@ async def startup():
misfire_grace_time=600 * 3
)
app.state.permission_tree, app.state.label_map = make_permission_tree_and_label_map()
# app.state.permission_tree, app.state.label_map = make_permission_tree_and_label_map()
app.state.scheduler.add_job(
update_staking_node_status_task,
......
This diff is collapsed.
......@@ -11,7 +11,7 @@ from configs import settings
from db import AioMongodbManager
from dependencies import get_hour_price_collect
from exception.http import RequestHttpException
from tools.cache_helper import update_cache, delete_cache, get_many_cache, save_cache, get_cache
from tools.cache_helper import update_cache, delete_cache, get_many_cache, save_cache
from tools.http_helper import aio_request
from tools.time_helper import utc_now
......@@ -102,9 +102,9 @@ class CMCPrice:
logger.error(f'[删除存在数据] {symbol} [{len(delete_time)}]')
@staticmethod
def get_end_time(offset=0):
def get_now_hour(offset=None):
return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC, microsecond=0, second=0,
minute=0) - datetime.timedelta(hours=1 + offset)
minute=0) + (offset or datetime.timedelta(hours=0))
async def ohlcv_request_core(self, symbol, start_time=None, count=5):
"""
......@@ -117,8 +117,10 @@ class CMCPrice:
await self.init()
url = f'https://web-api.coinmarketcap.com/v2/cryptocurrency/ohlcv/historical?interval=1h&time_period=hourly&count={count}&symbol={symbol}&time_start={int(start_time.timestamp())}'
error = ''
# 过滤数据在开始和结束时间范围内
end_time = self.get_now_hour()
for i in range(5):
for_save_data = []
res_data_list = []
try:
res_data = await aio_request(
url=url, proxies=self.proxies,
......@@ -126,7 +128,7 @@ class CMCPrice:
)
if not res_data['data']:
logger.warning(f'[没有更多数据] [{symbol}]')
return for_save_data
return res_data_list
for item in res_data['data'][symbol][0]['quotes']:
ohlcv_item = item['quote']['USD']
del ohlcv_item['timestamp']
......@@ -135,30 +137,27 @@ class CMCPrice:
'time': datetime.datetime.fromisoformat(item['time_open']),
'create_time': datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
})
for_save_data.append(ohlcv_item)
return for_save_data
if end_time >= ohlcv_item['time'] > start_time:
res_data_list.append(ohlcv_item)
return res_data_list
except Exception as e:
error = str(e)
logger.warning(f'[k线请求失败] [10秒后重试] [{url}] {e}')
await asyncio.sleep(10)
raise RequestHttpException(message=f'[请求失败] [{url}] [{error}]')
async def task_core(self, symbol, this_start_time):
async def loop_request(self, symbol, start_time):
await self.init()
try:
for_save_data = await self.ohlcv_request_core(symbol, this_start_time, count=self.per_request_limit)
except RequestHttpException as e:
raise Exception(f'[小时数据请求失败] [{symbol}] [{this_start_time}] [{e}]')
# 过滤数据在开始和结束时间范围内
next_start = None
end_time = self.get_end_time()
filtered_data = []
for item in for_save_data:
next_start = item['time']
if end_time >= item['time'] > this_start_time:
filtered_data.append(item)
next_start = start_time
if not filtered_data:
return None
while True:
for_save_data = await self.ohlcv_request_core(symbol, next_start, count=self.per_request_limit)
if not for_save_data:
break
next_start = for_save_data[-1]['time']
yield for_save_data
async def save_db(self, symbol, filtered_data):
# 删除存在数据
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
await self.delete_exist_data(symbol, filtered_data, hour_price_collect)
......@@ -167,53 +166,72 @@ class CMCPrice:
inserted = len(insert_res.inserted_ids)
logger.info(
f'[小时数据写入成功] [{symbol}] [{inserted}] [{filtered_data[0]["time"]} ... {filtered_data[-1]["time"]}]')
if next_start >= end_time:
next_start = None
return next_start
async def history_task(self, symbol):
async def start_task(self, symbol, start_time):
"""
根据数据库缓存信息 执行任务
"""
task_status = await get_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}')
if not task_status:
return
this_start_time = task_status['next_start']
try:
next_start = await self.task_core(symbol, this_start_time)
async for filtered_data in self.loop_request(symbol, start_time):
await self.save_db(symbol, filtered_data)
except Exception as e:
logger.error(e)
return await self.update_task_status(symbol, this_start_time, str(e))
if not next_start:
logger.info(f'[小时数据完成] [{symbol}]')
return
return await delete_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}')
else:
# 保存进度 用于重启后继续获取
await self.update_task_status(symbol, next_start)
return await self.history_task(symbol)
async def start_init_ohlcv(self, symbol, resume=False):
async def query_last_data(self, symbol):
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
data = await hour_price_collect.find_one({'symbol': symbol}, sort=[('time', -1)])
return data
async def start_init_ohlcv(self, symbol):
await self.init()
if not resume:
start_time = await self.query_added_date(symbol)
# 新建进度
await self.update_task_status(symbol, start_time)
await self.history_task(symbol)
logger.info(f'[开始初始化] [{symbol}]')
await self.start_task(symbol, start_time)
logger.info(f'[初始化完成] [{symbol}]')
async def start_update_ohlcv(self):
# 更新需要获取报价币种
await self.load_base_coins()
for symbol in self.base_coins:
start_time = self.get_end_time(offset=2)
async def start_update_ohlcv(self, symbol=None):
await self.init()
symbol_list = [symbol] if symbol else self.base_coins
for symbol in symbol_list:
logger.info(f'[开始更新] [{symbol}]')
last_data = await self.query_last_data(symbol)
start_time = last_data['time']
# 新建进度
await self.update_task_status(symbol, start_time)
await self.history_task(symbol)
logger.info(f'[更新数据] [{symbol}] [从{start_time}开始]')
await self.start_task(symbol, start_time)
logger.info(f'[更新完成] [{symbol}]')
async def get_index_price_map(self, symbol_list):
await self.init()
start_time = self.get_now_hour(offset=-datetime.timedelta(days=7))
for symbol in symbol_list:
async for filtered_data in self.loop_request(symbol, start_time):
yield symbol, filtered_data
async def start_update_and_init(self, symbol=None):
await self.init()
symbol_list = [symbol] if symbol else self.base_coins
for symbol in symbol_list:
if symbol == 'USD':
logger.warning('[跳过] [USD]')
continue
if not await self.is_init_symbol(symbol):
await self.start_init_ohlcv(symbol)
else:
await self.start_update_ohlcv(symbol)
async def is_init_symbol(self, symbol):
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
result = hour_price_collect.find({'symbol': symbol}).limit(1)
data = await result.to_list(length=None)
return bool(data)
async def check_data(self, symbol, start_time=None):
await self.init()
start_time = start_time or (await self.query_added_date(symbol))
end_time = self.get_end_time()
end_time = self.get_now_hour(offset=-datetime.timedelta(hours=1))
find_end_time = min([start_time + datetime.timedelta(hours=24 * 365), end_time])
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
lost_data = []
......@@ -289,6 +307,8 @@ if __name__ == '__main__':
mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
cmc_price = CMCPrice(mongodb_manager)
asyncio.run(cmc_price.start_update_ohlcv())
# asyncio.run(cmc_price.start_update_and_init())
# asyncio.run(cmc_price.get_index_price_map(cmc_price.base_coins))
# asyncio.run(cmc_price.save_history_data('BTC'))
# asyncio.run(cmc_price.start_init_ohlcv('USDC', resume=False))
......@@ -17,5 +17,5 @@ async def aio_request(url, method='GET', json_res=True, **kwargs):
logger.info(f'请求成功 [{method}] [{url}]')
return res
except Exception as e:
logger.error(f'请求失败 [{method}] [{url}] [{e}]')
logger.error(f'[Requests错误] [{method}] [{url}] [{e}]')
raise RequestHttpException(message=str(e))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment