Commit 711e4658 authored by 陈涛's avatar 陈涛
parents 43b96ec1 e0cae5f3
...@@ -58,6 +58,7 @@ async def update( ...@@ -58,6 +58,7 @@ async def update(
}) })
data = await fund_collect.find_one_and_update({'id': fund_id, 'user_id': user.id}, {'$set': db_update_data}, data = await fund_collect.find_one_and_update({'id': fund_id, 'user_id': user.id}, {'$set': db_update_data},
return_document=ReturnDocument.AFTER) return_document=ReturnDocument.AFTER)
assert data, NotFundError()
if data['fund_type'] == FundType.staking: if data['fund_type'] == FundType.staking:
return Response[StakingFund](data=StakingFund(**data)) return Response[StakingFund](data=StakingFund(**data))
else: else:
...@@ -72,8 +73,7 @@ async def get( ...@@ -72,8 +73,7 @@ async def get(
fund_collect: AgnosticCollection = Depends(get_fund_collect) fund_collect: AgnosticCollection = Depends(get_fund_collect)
): ):
data = await fund_collect.find_one({'id': fund_id, 'user_id': user.id}) data = await fund_collect.find_one({'id': fund_id, 'user_id': user.id})
if not data: assert data, NotFundError()
raise NotFundError()
if data['fund_type'] == FundType.staking: if data['fund_type'] == FundType.staking:
response = Response[StakingFund](data=StakingFund(**data).dict()) response = Response[StakingFund](data=StakingFund(**data).dict())
else: else:
......
...@@ -19,8 +19,8 @@ async def create( ...@@ -19,8 +19,8 @@ async def create(
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
scheduler: AsyncIOScheduler = Depends(get_scheduler) scheduler: AsyncIOScheduler = Depends(get_scheduler)
): ):
await calculate_nav_task(fund_id, scheduler, fund_collect, user.id) job_id = await calculate_nav_task(fund_id, scheduler, fund_collect, user.id)
return BaseResponse(message='创建成功') return BaseResponse(message='创建成功', data={"job_id": job_id})
@router.delete('/', @router.delete('/',
......
...@@ -4,7 +4,10 @@ from configs import env, make_dburi ...@@ -4,7 +4,10 @@ from configs import env, make_dburi
class Settings: class Settings:
env = env env = env
name = '本地环境' name = '本地环境'
mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018', user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY') py_fund_mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018',
user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY')
jasper_mongodb_uri = make_dburi(schema='mongodb', sock_path='13.115.26.128:27018',
user_name='root', pwd='ETHQig66tzxoZc+wuIPEUTMVsY') # 数据库版本问题
# mongodb = mongodb_url('127.0.0.1:27017') # mongodb = mongodb_url('127.0.0.1:27017')
redis_uri = make_dburi(schema='redis', sock_path='13.115.26.128:6379', pwd='MukAzxGMOL2') redis_uri = make_dburi(schema='redis', sock_path='13.115.26.128:6379', pwd='MukAzxGMOL2')
......
...@@ -5,7 +5,8 @@ from db.redis_helper import AioRedisManager ...@@ -5,7 +5,8 @@ from db.redis_helper import AioRedisManager
def register_mongodb(app): def register_mongodb(app):
mongodb_manager = AioMongodbManager() mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.mongodb_uri, 'pyfund') mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
mongodb_manager.setup_pool(settings.jasper_mongodb_uri, 'jasper')
app.state.mongodb_manager = mongodb_manager app.state.mongodb_manager = mongodb_manager
......
...@@ -43,6 +43,11 @@ def get_bill_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_ma ...@@ -43,6 +43,11 @@ def get_bill_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_ma
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='bill') return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='bill')
def get_hour_price_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection:
# return mongodb_manager.get_client(name='jasper', db='TradeCenter_c', collect='HourMarketQuotes') # todo 数据库版本问题
return mongodb_manager.get_client(name='jasper', db='Market', collect='HourMarketQuotes')
# 获取redis Client # 获取redis Client
def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)): def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)):
return redis_manager.get_client(name='cmc_price') return redis_manager.get_client(name='cmc_price')
...@@ -71,7 +71,7 @@ async def startup(): ...@@ -71,7 +71,7 @@ async def startup():
app.include_router(api_router) app.include_router(api_router)
# 添加定时任务 # 添加定时任务
app.state.scheduler = create_scheduler(settings.mongodb_uri) app.state.scheduler = create_scheduler(settings.py_fund_mongodb_uri)
app.state.scheduler.start() app.state.scheduler.start()
app.state.scheduler.print_jobs() app.state.scheduler.print_jobs()
......
...@@ -4,7 +4,7 @@ from typing import List, Optional ...@@ -4,7 +4,7 @@ from typing import List, Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from model import BaseCreateModel from model import BaseCreateModel
from model.asset import NormalAsset from model.asset import NormalAsset, BaseCoinAssetItem
from model.node import BaseNode from model.node import BaseNode
...@@ -53,7 +53,7 @@ class NormalFund(BaseFundItem, BaseCreateModel): ...@@ -53,7 +53,7 @@ class NormalFund(BaseFundItem, BaseCreateModel):
user_id: str user_id: str
user_email: str user_email: str
nav: Optional[float] = Field(default=1, description='当前净值') nav: Optional[float] = Field(default=1, description='当前净值')
assets: List[NormalAsset] = Field(default=[], description='持仓') assets: List[BaseCoinAssetItem] = Field(default=[], description='持仓')
class StakingFund(BaseFundItem, BaseCreateModel): class StakingFund(BaseFundItem, BaseCreateModel):
...@@ -61,4 +61,4 @@ class StakingFund(BaseFundItem, BaseCreateModel): ...@@ -61,4 +61,4 @@ class StakingFund(BaseFundItem, BaseCreateModel):
user_email: str user_email: str
nav: Optional[float] = Field(default=1, description='当前净值') nav: Optional[float] = Field(default=1, description='当前净值')
nodes: List[BaseNode] nodes: List[BaseNode]
assets: List[NormalAsset] = Field(default=[], description='持仓') assets: List[BaseCoinAssetItem] = Field(default=[], description='持仓')
...@@ -15,7 +15,7 @@ class BaseNode(BaseCreateModel): ...@@ -15,7 +15,7 @@ class BaseNode(BaseCreateModel):
pub_key: str = Field(..., description='绑定的key') pub_key: str = Field(..., description='绑定的key')
status: NodeStatus = Field(default=NodeStatus.pending, description='状态') status: NodeStatus = Field(default=NodeStatus.pending, description='状态')
currency: str = Field(default='ETH', description='节点需要质押的币种') currency: str = Field(default='ETH', description='节点需要质押的币种')
value: float = Field(default=32, description='节点需要质押的币种数量') volume: float = Field(default=32, description='节点需要质押的币种数量')
# 接口请求模型 创建 # 接口请求模型 创建
...@@ -23,4 +23,4 @@ class CreateNode(BaseModel): ...@@ -23,4 +23,4 @@ class CreateNode(BaseModel):
pub_key: str = Field(..., description='绑定的key') pub_key: str = Field(..., description='绑定的key')
fund_id: str = Field(..., description='绑定基金的ID') fund_id: str = Field(..., description='绑定基金的ID')
currency: str = Field(default='ETH', description='节点需要质押的币种') currency: str = Field(default='ETH', description='节点需要质押的币种')
value: float = Field(default=32, description='节点需要质押的币种数量') volume: float = Field(default=32, description='节点需要质押的币种数量')
import datetime
import pytz
from loguru import logger from loguru import logger
from dependencies import get_fund_collect, get_cmc_price_redis from dependencies import get_fund_collect, get_cmc_price_redis, get_hour_price_collect
from model.fund import FundStatus from model.fund import FundStatus
from tools.price_helper import get_price
async def calculate_nav(fund_id): async def calculate_nav(fund_id, set_time=False):
from main import app from main import app
redis_client = get_cmc_price_redis(app.state.redis_manager)
fund_collect = get_fund_collect(app.state.mongodb_manager) fund_collect = get_fund_collect(app.state.mongodb_manager)
logger.info(f'[定时任务开始执行] [计算净值] {fund_id} ') logger.info(f'[定时任务开始执行] [计算净值] {fund_id} ')
fund_data = await fund_collect.find_one({'id': fund_id, 'fund_status': FundStatus.active}) fund_data = await fund_collect.find_one({'id': fund_id, 'fund_status': FundStatus.active})
for coin_item in fund_data['assets']: amount = {}
data = await redis_client.hget(f'CMC:CoinPrice:{coin_item["currency"]}', 'Price') for item in fund_data['assets']:
print(f'获取到报价 {data}') amount.setdefault(item['currency'], 0)
# todo 未完成具体实现 amount[item['currency']] += item['volume']
# if not set_time:
# coin_price = await redis_client.hget(f'CMC:CoinPrice:{coin_item["currency"]}', 'Price')
for item in fund_data['nodes']:
amount.setdefault(item['currency'], 0)
amount[item['currency']] += item['volume']
set_time = datetime.datetime.utcnow().replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
if set_time:
hour_price_collect = get_hour_price_collect(app.state.mongodb_manager)
price_data = await get_price(list(amount), set_time, hour_price_collect)
else:
redis_client = get_cmc_price_redis(app.state.redis_manager)
price_data = await get_price(list(amount), None, redis_client)
print(f'获取到报价 {price_data}')
# todo 未完成具体实现
...@@ -37,3 +37,4 @@ async def calculate_nav_task(fund_id, schedular, fund_collect: AgnosticCollectio ...@@ -37,3 +37,4 @@ async def calculate_nav_task(fund_id, schedular, fund_collect: AgnosticCollectio
trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC), trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
misfire_grace_time=10) misfire_grace_time=10)
schedular.print_jobs() schedular.print_jobs()
return job_id
async def get_price(coin_list, query_time, db_client):
if not query_time:
temp_index = []
result_dict = {}
async with db_client.pipeline(transaction=True) as pipe:
for coin_item in coin_list:
pipe.hget(f'CMC:CoinPrice:{coin_item}', 'Price')
temp_index.append(coin_item)
data = await pipe.execute()
for coin_name, result_item in zip(temp_index, data):
result_dict[coin_name] = float(result_item)
else:
cursor = db_client.find(
{'ReqTime': query_time, "SequenceCode": "CMC", "PayCurrency": "USD", "MOSymbol": {"$in": coin_list}})
result_dict = {}
temp_res = await cursor.to_list(length=None)
for item in temp_res:
result_dict[item['MOSymbol']] = item['Price']
return result_dict
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment