Commit 9904f5dd authored by 陈涛's avatar 陈涛

计算staking基金净值,查询净值记录

parent dd3aff4b
from fastapi import APIRouter from fastapi import APIRouter, Depends
from motor.core import AgnosticCollection
from dependencies import get_nav_collect
from model import SortParams, FilterTime, Page, PageResponse
from model.fund import StakingFundNav
from service.nav import calculate_nav
router = APIRouter() router = APIRouter()
@router.get("/execute/{fund_id}/")
async def test_nav(fund_id):
response = await calculate_nav(fund_id)
return response
@router.get("/{fund_id}/", response_model=PageResponse[StakingFundNav], summary="查询净值记录")
async def nav(
fund_id: str,
sort_by: SortParams = Depends(SortParams),
filter_time: FilterTime = Depends(FilterTime),
page: Page = Depends(Page),
nav_collect: AgnosticCollection = Depends(get_nav_collect)
):
query = {"fund_id": fund_id}
if filter_time.start_time and filter_time.end_time:
query.update({'record_date': filter_time.to_mongodb_query()})
count = await nav_collect.count_documents(query)
skip = (page.page - 1) * page.page_size
cursor = nav_collect.find(query)
cursor = cursor.skip(skip).sort([(sort_by.sort_field, sort_by.sort_direction)]).limit(page.page_size)
result = await cursor.to_list(length=None)
response = PageResponse[StakingFundNav](data=result, **page.dict(), total=count)
return response
...@@ -51,6 +51,10 @@ def get_cache_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_m ...@@ -51,6 +51,10 @@ def get_cache_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_m
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='cache') return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='cache')
def get_nav_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection:
return mongodb_manager.get_client(name='pyfund', db='pyfund', collect='nav')
# 获取redis Client # 获取redis Client
def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)) -> AioRedisManager: def get_cmc_price_redis(redis_manager: AioRedisManager = Depends(get_redis_manager)) -> AioRedisManager:
return redis_manager.get_client(name='cmc_price') return redis_manager.get_client(name='cmc_price')
import datetime import datetime
import uuid import uuid
from enum import IntEnum from enum import IntEnum
from typing import Any, TypeVar, Generic, Optional, List from typing import Any, TypeVar, Generic, Optional, List, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from pydantic.generics import GenericModel from pydantic.generics import GenericModel
...@@ -28,8 +28,8 @@ class SortParams(BaseModel): ...@@ -28,8 +28,8 @@ class SortParams(BaseModel):
class FilterTime(BaseModel): class FilterTime(BaseModel):
start_time: Optional[int] = Field(None, description='查询开始时间') start_time: Optional[Union[int, datetime.datetime]] = Field(None, description='查询开始时间')
end_time: Optional[int] = Field(None, description="查询结束时间") end_time: Optional[Union[int, datetime.datetime]] = Field(None, description="查询结束时间")
def to_mongodb_query(self): def to_mongodb_query(self):
return {"$gte": self.start_time, "$lte": self.end_time} return {"$gte": self.start_time, "$lte": self.end_time}
......
import datetime import datetime
from typing import Optional, Dict from typing import Optional, Dict
import pytz
from pydantic import Field from pydantic import Field
from model import MyBaseModel from model import MyBaseModel
...@@ -43,3 +44,11 @@ class StakingFund(BaseFundItem): ...@@ -43,3 +44,11 @@ class StakingFund(BaseFundItem):
pending_assets: Dict[str, float] = Field(default={}, description='pending资产') pending_assets: Dict[str, float] = Field(default={}, description='pending资产')
staking_assets: Dict[str, float] = Field(default={}, description='质押资产') staking_assets: Dict[str, float] = Field(default={}, description='质押资产')
assets_update: float = Field(default_factory=utc_now_timestamp, description='余额更新时间') assets_update: float = Field(default_factory=utc_now_timestamp, description='余额更新时间')
class StakingFundNav(StakingFund):
fund_id: str = Field(..., description="基金id")
record_date: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC),
description="记录时间"
)
\ No newline at end of file
import datetime import datetime
import uuid
import pytz import pytz
from fastapi import Depends
from loguru import logger from loguru import logger
from dependencies import get_fund_collect, get_cmc_price_redis, get_hour_price_collect from dependencies import get_fund_collect, get_cmc_price_redis, get_hour_price_collect, get_bill_collect, \
get_nav_collect
from model.fund import FundStatus from model.fund import FundStatus
from service.beacon import BeaconChaService from service.beacon import BeaconChaService
from service.price import get_price from service.price import get_price
async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService = Depends(BeaconChaService)): async def calculate_nav(fund_id, calc_time=None, beach_service=BeaconChaService()):
# todo 测试 # todo 测试
calc_time = datetime.datetime.utcnow().replace(minute=0, second=0, microsecond=0, calc_time = datetime.datetime.utcnow().replace(minute=0, second=0, microsecond=0,
tzinfo=pytz.UTC) - datetime.timedelta(hours=2) tzinfo=pytz.UTC) - datetime.timedelta(hours=2)
date = datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
from main import app from main import app
fund_collect = get_fund_collect(app.state.mongodb_manager) fund_collect = get_fund_collect(app.state.mongodb_manager)
logger.info(f'[定时任务开始执行] [计算净值] {fund_id} ') logger.info(f'[定时任务开始执行] [计算净值] {fund_id} ')
...@@ -29,8 +31,6 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService ...@@ -29,8 +31,6 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService
amount.setdefault(key, 0) amount.setdefault(key, 0)
amount[key] += value amount[key] += value
#
# 查询节点收益 # 查询节点收益
for key, value in fund_data["nodes"].items(): for key, value in fund_data["nodes"].items():
if value["status"] == "active": if value["status"] == "active":
...@@ -43,10 +43,6 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService ...@@ -43,10 +43,6 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService
amount.setdefault(key, 0) amount.setdefault(key, 0)
amount[key] += value amount[key] += value
# for item in fund_data.get('nodes', []):
# amount.setdefault(item['currency'], 0)
# amount[item['currency']] += item['volume']
if calc_time: if calc_time:
# mongodb 查询历史数据 # mongodb 查询历史数据
hour_price_collect = get_hour_price_collect(app.state.mongodb_manager) hour_price_collect = get_hour_price_collect(app.state.mongodb_manager)
...@@ -65,4 +61,34 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService ...@@ -65,4 +61,34 @@ async def calculate_nav(fund_id, calc_time=None, beach_service: BeaconChaService
for symbol, volume in amount.items(): for symbol, volume in amount.items():
nav[symbol] = volume * price_data[symbol] nav[symbol] = volume * price_data[symbol]
logger.info(f'[资产情况] [{fund_id}] {nav}') logger.info(f'[资产情况] [{fund_id}] {nav}')
return nav bill_collect = get_bill_collect(app.state.mongodb_manager)
cursor = bill_collect.find({
"bill_type": {"$in": ["adjust", "sub", "redemption"]}
})
result = await cursor.to_list(length=None)
total_fund_share = sum(-item["fund_share"] if item["bill_type"] == "redemption" else item["fund_share"] for item in result)
value = sum(nav.values())
net_value = round(value / total_fund_share, 2)
nav_collect = get_nav_collect(app.state.mongodb_manager)
nav_record = await nav_collect.find_one({"fund_id": fund_id, "record_date": date})
if nav_record:
error = f'重复净值记录 [{nav_record}] [mongo] [{date}]'
logger.error(error)
raise Exception(error)
fund_nav = {}
for key, value in fund_data.items():
if key == "_id":
continue
elif key == "id":
fund_nav["fund_id"] = value
fund_nav["id"] = uuid.uuid1().__str__()
else:
fund_nav[key] = value
fund_nav["record_date"] = date
fund_nav["net_value"] = net_value
insert = await nav_collect.insert_one(fund_nav)
if not insert.acknowledged:
error = f'写入净值记录失败 [{fund_nav}] [mongo] [{date}]'
logger.error(error)
raise Exception(error)
return fund_nav
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment