Commit e0cae5f3 authored by 杨明橙's avatar 杨明橙

修改净值计算任务

parent 51af1633
......@@ -58,6 +58,7 @@ async def update(
})
data = await fund_collect.find_one_and_update({'id': fund_id, 'user_id': user.id}, {'$set': db_update_data},
return_document=ReturnDocument.AFTER)
assert data, NotFundError()
if data['fund_type'] == FundType.staking:
return Response[StakingFund](data=StakingFund(**data))
else:
......@@ -72,8 +73,7 @@ async def get(
fund_collect: AgnosticCollection = Depends(get_fund_collect)
):
data = await fund_collect.find_one({'id': fund_id, 'user_id': user.id})
if not data:
raise NotFundError()
assert data, NotFundError()
if data['fund_type'] == FundType.staking:
response = Response[StakingFund](data=StakingFund(**data).dict())
else:
......
......@@ -19,8 +19,8 @@ async def create(
fund_collect: AgnosticCollection = Depends(get_fund_collect),
scheduler: AsyncIOScheduler = Depends(get_scheduler)
):
await calculate_nav_task(fund_id, scheduler, fund_collect, user.id)
return BaseResponse(message='创建成功')
job_id = await calculate_nav_task(fund_id, scheduler, fund_collect, user.id)
return BaseResponse(message='创建成功', data={"job_id": job_id})
@router.delete('/',
......
......@@ -4,7 +4,10 @@ from configs import env, make_dburi
class Settings:
env = env
name = '本地环境'
mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018', user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY')
py_fund_mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018',
user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY')
jasper_mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018',
user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY') # 数据库版本问题
# mongodb = mongodb_url('127.0.0.1:27017')
redis_uri = make_dburi(schema='redis', sock_path='13.115.26.128:6379', pwd='MukAzxGMOL2')
......
......@@ -5,7 +5,8 @@ from db.redis_helper import AioRedisManager
def register_mongodb(app):
mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.mongodb_uri, 'pyfund')
mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
mongodb_manager.setup_pool(settings.jasper_mongodb_uri, 'jasper')
app.state.mongodb_manager = mongodb_manager
......
......@@ -43,6 +43,11 @@ def get_bill_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_ma
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='bill')
def get_hour_price_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection:
# return mongodb_manager.get_client(name='jasper', db='TradeCenter_c', collect='HourMarketQuotes') # todo 数据库版本问题
return mongodb_manager.get_client(name='jasper', db='Market', collect='HourMarketQuotes')
# 获取redis Client
def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)):
return redis_manager.get_client(name='cmc_price')
......@@ -68,7 +68,7 @@ async def startup():
app.include_router(api_router)
# 添加定时任务
app.state.scheduler = create_scheduler(settings.mongodb_uri)
app.state.scheduler = create_scheduler(settings.py_fund_mongodb_uri)
app.state.scheduler.start()
app.state.scheduler.print_jobs()
......
......@@ -4,7 +4,7 @@ from typing import List, Optional
from pydantic import BaseModel, Field
from model import BaseCreateModel
from model.asset import NormalAsset
from model.asset import NormalAsset, BaseCoinAssetItem
from model.node import BaseNode
......@@ -53,7 +53,7 @@ class NormalFund(BaseFundItem, BaseCreateModel):
user_id: str
user_email: str
nav: Optional[float] = Field(default=1, description='当前净值')
assets: List[NormalAsset] = Field(default=[], description='持仓')
assets: List[BaseCoinAssetItem] = Field(default=[], description='持仓')
class StakingFund(BaseFundItem, BaseCreateModel):
......@@ -61,4 +61,4 @@ class StakingFund(BaseFundItem, BaseCreateModel):
user_email: str
nav: Optional[float] = Field(default=1, description='当前净值')
nodes: List[BaseNode]
assets: List[NormalAsset] = Field(default=[], description='持仓')
assets: List[BaseCoinAssetItem] = Field(default=[], description='持仓')
......@@ -15,7 +15,7 @@ class BaseNode(BaseCreateModel):
pub_key: str = Field(..., description='绑定的key')
status: NodeStatus = Field(default=NodeStatus.pending, description='状态')
currency: str = Field(default='ETH', description='节点需要质押的币种')
value: float = Field(default=32, description='节点需要质押的币种数量')
volume: float = Field(default=32, description='节点需要质押的币种数量')
# 接口请求模型 创建
......@@ -23,4 +23,4 @@ class CreateNode(BaseModel):
pub_key: str = Field(..., description='绑定的key')
fund_id: str = Field(..., description='绑定基金的ID')
currency: str = Field(default='ETH', description='节点需要质押的币种')
value: float = Field(default=32, description='节点需要质押的币种数量')
volume: float = Field(default=32, description='节点需要质押的币种数量')
import datetime
import pytz
from loguru import logger
from dependencies import get_fund_collect, get_cmc_price_redis
from dependencies import get_fund_collect, get_cmc_price_redis, get_hour_price_collect
from model.fund import FundStatus
from tools.price_helper import get_price
async def calculate_nav(fund_id):
async def calculate_nav(fund_id, set_time=False):
from main import app
redis_client = get_cmc_price_redis(app.state.redis_manager)
fund_collect = get_fund_collect(app.state.mongodb_manager)
logger.info(f'[定时任务开始执行] [计算净值] {fund_id} ')
fund_data = await fund_collect.find_one({'id': fund_id, 'fund_status': FundStatus.active})
for coin_item in fund_data['assets']:
data = await redis_client.hget(f'CMC:CoinPrice:{coin_item["currency"]}', 'Price')
print(f'获取到报价 {data}')
# todo 未完成具体实现
amount = {}
for item in fund_data['assets']:
amount.setdefault(item['currency'], 0)
amount[item['currency']] += item['volume']
# if not set_time:
# coin_price = await redis_client.hget(f'CMC:CoinPrice:{coin_item["currency"]}', 'Price')
for item in fund_data['nodes']:
amount.setdefault(item['currency'], 0)
amount[item['currency']] += item['volume']
set_time = datetime.datetime.utcnow().replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
if set_time:
hour_price_collect = get_hour_price_collect(app.state.mongodb_manager)
price_data = await get_price(list(amount), set_time, hour_price_collect)
else:
redis_client = get_cmc_price_redis(app.state.redis_manager)
price_data = await get_price(list(amount), None, redis_client)
print(f'获取到报价 {price_data}')
# todo 未完成具体实现
......@@ -37,3 +37,4 @@ async def calculate_nav_task(fund_id, schedular, fund_collect: AgnosticCollectio
trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
misfire_grace_time=10)
schedular.print_jobs()
return job_id
async def get_price(coin_list, query_time, db_client):
if not query_time:
temp_index = []
result_dict = {}
async with db_client.pipeline(transaction=True) as pipe:
for coin_item in coin_list:
pipe.hget(f'CMC:CoinPrice:{coin_item}', 'Price')
temp_index.append(coin_item)
data = await pipe.execute()
for coin_name, result_item in zip(temp_index, data):
result_dict[coin_name] = float(result_item)
else:
cursor = db_client.find(
{'ReqTime': query_time, "SequenceCode": "CMC", "PayCurrency": "USD", "MOSymbol": {"$in": coin_list}})
result_dict = {}
temp_res = await cursor.to_list(length=None)
for item in temp_res:
result_dict[item['MOSymbol']] = item['Price']
return result_dict
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment