Commit 228ac7eb authored by 陈涛's avatar 陈涛

净值修改,提交日志

parent 1dcac753
import datetime
import re
from typing import Union, List, Any from typing import Union, List, Any
import pytz
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query
from motor.core import AgnosticCollection from motor.core import AgnosticCollection
from pymongo import UpdateOne
from dependencies import get_current_user, get_fund_collect, get_bill_collect, get_permission_user_collect, \ from dependencies import get_current_user, get_fund_collect, get_bill_collect, get_permission_user_collect, \
get_permission_role_collect get_permission_role_collect, get_nav_collect
from model import Response, Page, PageResponse, SortParams, FilterTime from model import Response, Page, PageResponse, SortParams, FilterTime
from model.bill import PCFBill, ExchangeBill, AdjustBill, StakingBill from model.bill import PCFBill, ExchangeBill, AdjustBill, StakingBill
from model.node import BaseNode from model.node import BaseNode
...@@ -13,9 +17,10 @@ from schema.fund import FundStatus ...@@ -13,9 +17,10 @@ from schema.fund import FundStatus
from schema.node import BindNode from schema.node import BindNode
from service.beacon import BeaconChaService from service.beacon import BeaconChaService
from service.bill import update_bill from service.bill import update_bill
from service.fund import query_fund_assets_and_nodes, update_fund from service.fund import query_fund_assets_and_nodes, update_fund, get_nav_record
from service.permission import check_permission from service.permission import check_permission
from tools.jwt_tools import User from tools.jwt_tools import User
from tools.time_helper import timestamp_to_datetime
router = APIRouter() router = APIRouter()
bill_type_to_permission = { bill_type_to_permission = {
...@@ -27,6 +32,20 @@ bill_type_to_permission = { ...@@ -27,6 +32,20 @@ bill_type_to_permission = {
} }
async def nav_handler(nav_collect, fund_id, record_time, settlement_time):
record_date = timestamp_to_datetime(record_time)
re_time = re.findall(r"(\d+):00", settlement_time)[0]
if record_date.hour < re_time:
record_date += datetime.timedelta(days=-1)
start_date = record_date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
nav_record = await get_nav_record(
nav_collect=nav_collect,
fund_id=fund_id,
query_data={"record_time": {"$gte": start_date}}
)
return nav_record
@router.post('/pcf/', @router.post('/pcf/',
response_model=Response[PCFBill], response_model=Response[PCFBill],
tags=['[申购/赎回]'], tags=['[申购/赎回]'],
...@@ -36,6 +55,7 @@ async def create_pcf( ...@@ -36,6 +55,7 @@ async def create_pcf(
create_pcf_bill: CreatePCFBill, create_pcf_bill: CreatePCFBill,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
nav_collect: AgnosticCollection = Depends(get_nav_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect), permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect) permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
...@@ -45,16 +65,27 @@ async def create_pcf( ...@@ -45,16 +65,27 @@ async def create_pcf(
user.email, permission_user_collect, user.email, permission_user_collect,
permission_role_collect) permission_role_collect)
delta_volume = create_pcf_bill.volume if create_pcf_bill.bill_type == PCFBillType.sub else -create_pcf_bill.volume delta_volume = create_pcf_bill.volume if create_pcf_bill.bill_type == PCFBillType.sub else -create_pcf_bill.volume
fund_data = await query_fund_assets_and_nodes(
assets, adjust_assets, pending_assets, staking_assets, nodes = await query_fund_assets_and_nodes(
fund_collect=fund_collect, fund_collect=fund_collect,
fund_id=create_pcf_bill.fund_id, fund_id=create_pcf_bill.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active)
assets = fund_data["assets"]
assets.setdefault(create_pcf_bill.currency, 0) assets.setdefault(create_pcf_bill.currency, 0)
# 如果是赎回 判断余额是否够 # # 如果是赎回 判断余额是否够
assert assets[create_pcf_bill.currency] + delta_volume >= 0, "余额不足" # assert assets[create_pcf_bill.currency] + delta_volume >= 0, "余额不足"
assets[create_pcf_bill.currency] += delta_volume assets[create_pcf_bill.currency] += delta_volume
nav_record = await nav_handler(
nav_collect=nav_collect, fund_id=create_pcf_bill.fund_id,
record_time=create_pcf_bill.record_time, settlement_time=fund_data["settlement_time"]
)
bulk_list = []
for nav in nav_record:
nav["assets"].setdefault(create_pcf_bill.currency, 0)
nav["assets"][create_pcf_bill.currency] += delta_volume
bulk_list.append(UpdateOne({"_id": nav["_id"]}, {"$set": {"assets": nav["assets"]}}))
await update_fund(fund_collect, create_pcf_bill.fund_id, assets=assets) await update_fund(fund_collect, create_pcf_bill.fund_id, assets=assets)
await nav_collect.bulk_write(bulk_list)
pcf = PCFBill(user_id=user.id, **create_pcf_bill.dict()) pcf = PCFBill(user_id=user.id, **create_pcf_bill.dict())
await bill_collect.insert_one(pcf.dict()) await bill_collect.insert_one(pcf.dict())
return Response[PCFBill](data=pcf.dict()) return Response[PCFBill](data=pcf.dict())
...@@ -69,6 +100,7 @@ async def create_exchange( ...@@ -69,6 +100,7 @@ async def create_exchange(
create_exchange_bill: CreateExchangeBill, create_exchange_bill: CreateExchangeBill,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
nav_collect: AgnosticCollection = Depends(get_nav_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect), permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect) permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
...@@ -77,19 +109,30 @@ async def create_exchange( ...@@ -77,19 +109,30 @@ async def create_exchange(
create_exchange_bill.fund_id, create_exchange_bill.fund_id,
user.email, permission_user_collect, user.email, permission_user_collect,
permission_role_collect) permission_role_collect)
assets, adjust_assets, pending_assets, staking_assets, nodes = await query_fund_assets_and_nodes( fund_data = await query_fund_assets_and_nodes(
fund_collect=fund_collect, fund_collect=fund_collect,
fund_id=create_exchange_bill.fund_id, fund_id=create_exchange_bill.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active)
assets = fund_data["assets"]
assets.setdefault(create_exchange_bill.output_currency, 0) assets.setdefault(create_exchange_bill.output_currency, 0)
assets.setdefault(create_exchange_bill.input_currency, 0) assets.setdefault(create_exchange_bill.input_currency, 0)
assert assets[ # assert assets[create_exchange_bill.input_currency] >= create_exchange_bill.input_volume, f"{create_exchange_bill.input_currency}余额不足"
create_exchange_bill.input_currency] >= create_exchange_bill.input_volume, f"{create_exchange_bill.input_currency}余额不足"
assets[create_exchange_bill.input_currency] -= create_exchange_bill.input_volume assets[create_exchange_bill.input_currency] -= create_exchange_bill.input_volume
assets[create_exchange_bill.output_currency] += create_exchange_bill.output_volume assets[create_exchange_bill.output_currency] += create_exchange_bill.output_volume
nav_record = await nav_handler(
nav_collect=nav_collect, fund_id=create_exchange_bill.fund_id,
record_time=create_exchange_bill.record_time, settlement_time=fund_data["settlement_time"]
)
bulk_list = []
for nav in nav_record:
nav["assets"].setdefault(create_exchange_bill.output_currency, 0)
nav["assets"].setdefault(create_exchange_bill.input_currency, 0)
nav["assets"][create_exchange_bill.input_currency] -= create_exchange_bill.input_volume
nav["assets"][create_exchange_bill.output_currency] += create_exchange_bill.output_volume
bulk_list.append(UpdateOne({"_id": nav["_id"]}, {"$set": {"assets": nav["assets"]}}))
await update_fund(fund_collect, create_exchange_bill.fund_id, assets=assets) await update_fund(fund_collect, create_exchange_bill.fund_id, assets=assets)
await nav_collect.bulk_write(bulk_list)
input_value, output_value = create_exchange_bill.input_volume * create_exchange_bill.input_price, create_exchange_bill.output_volume * create_exchange_bill.output_price input_value, output_value = create_exchange_bill.input_volume * create_exchange_bill.input_price, create_exchange_bill.output_volume * create_exchange_bill.output_price
exchange_bill = ExchangeBill(user_id=user.id, input_value=input_value, output_value=output_value, exchange_bill = ExchangeBill(user_id=user.id, input_value=input_value, output_value=output_value,
...@@ -107,6 +150,7 @@ async def create_adjust( ...@@ -107,6 +150,7 @@ async def create_adjust(
create_adjust_bill: CreateAdjustBill, create_adjust_bill: CreateAdjustBill,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
nav_collect: AgnosticCollection = Depends(get_nav_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect), permission_user_collect: AgnosticCollection = Depends(get_permission_user_collect),
permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect) permission_role_collect: AgnosticCollection = Depends(get_permission_role_collect)
...@@ -116,16 +160,30 @@ async def create_adjust( ...@@ -116,16 +160,30 @@ async def create_adjust(
user.email, permission_user_collect, user.email, permission_user_collect,
permission_role_collect) permission_role_collect)
assets, adjust_assets, pending_assets, staking_assets, nodes = await query_fund_assets_and_nodes( fund_data = await query_fund_assets_and_nodes(
fund_collect=fund_collect, fund_collect=fund_collect,
fund_id=create_adjust_bill.fund_id, fund_id=create_adjust_bill.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active
)
adjust_assets = fund_data["adjust_assets"]
adjust_assets.setdefault(create_adjust_bill.currency, 0) adjust_assets.setdefault(create_adjust_bill.currency, 0)
adjust_assets.setdefault('fund_share', 0) adjust_assets.setdefault('fund_share', 0)
adjust_assets[create_adjust_bill.currency] += create_adjust_bill.volume adjust_assets[create_adjust_bill.currency] += create_adjust_bill.volume
adjust_assets['fund_share'] += create_adjust_bill.fund_share adjust_assets['fund_share'] += create_adjust_bill.fund_share
nav_record = await nav_handler(
nav_collect=nav_collect, fund_id=create_adjust_bill.fund_id,
record_time=create_adjust_bill.record_time, settlement_time=fund_data["settlement_time"]
)
bulk_list = []
for nav in nav_record:
nav["assets"].setdefault(create_adjust_bill.currency, 0)
nav["assets"][create_adjust_bill.currency] += create_adjust_bill.volume
bulk_list.append(UpdateOne({"_id": nav["_id"]}, {"$set": {"assets": nav["assets"]}}))
await update_fund(fund_collect, create_adjust_bill.fund_id, adjust_assets=adjust_assets) await update_fund(fund_collect, create_adjust_bill.fund_id, adjust_assets=adjust_assets)
await nav_collect.bulk_write(bulk_list)
adjust_bill = AdjustBill(user_id=user.id, **create_adjust_bill.dict()) adjust_bill = AdjustBill(user_id=user.id, **create_adjust_bill.dict())
await bill_collect.insert_one(adjust_bill.dict()) await bill_collect.insert_one(adjust_bill.dict())
response = Response[AdjustBill](data=adjust_bill.dict()) response = Response[AdjustBill](data=adjust_bill.dict())
...@@ -151,9 +209,12 @@ async def create_staking_api( ...@@ -151,9 +209,12 @@ async def create_staking_api(
user.email, permission_user_collect, user.email, permission_user_collect,
permission_role_collect) permission_role_collect)
assets, adjust_assets, pending_assets, staking_assets, nodes = await query_fund_assets_and_nodes(fund_collect, fund_data = await query_fund_assets_and_nodes(
fund_collect,
fund_id=create_staking_bill.fund_id, fund_id=create_staking_bill.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active
)
assets, pending_assets, nodes = fund_data["assets"], fund_data["pending_assets"], fund_data["nodes"]
assert assets.get(create_staking_bill.currency, 0) >= create_staking_bill.volume + create_staking_bill.fee, '余额不足' assert assets.get(create_staking_bill.currency, 0) >= create_staking_bill.volume + create_staking_bill.fee, '余额不足'
assets[create_staking_bill.currency] -= create_staking_bill.volume + create_staking_bill.fee assets[create_staking_bill.currency] -= create_staking_bill.volume + create_staking_bill.fee
# 防止增加的币种没有 设置默认值 # 防止增加的币种没有 设置默认值
......
...@@ -20,12 +20,11 @@ router = APIRouter() ...@@ -20,12 +20,11 @@ router = APIRouter()
async def create( async def create(
request: Request, request: Request,
fund_id: str, fund_id: str,
background_tasks: BackgroundTasks, settlement_time: str
user: User = Depends(get_current_user),
): ):
scheduler = request.app.state.scheduler scheduler = request.app.state.scheduler
job_id = f"calculate_nav_{fund_id}" job_id = f"calculate_nav_{fund_id}"
time_obj = datetime.strptime("08:00", "%H:%M") time_obj = datetime.strptime(settlement_time, "%H:%M")
scheduler.add_job( scheduler.add_job(
calculate_nav_task, calculate_nav_task,
trigger="cron", trigger="cron",
...@@ -35,7 +34,6 @@ async def create( ...@@ -35,7 +34,6 @@ async def create(
args=[fund_id], args=[fund_id],
id=job_id id=job_id
) )
# background_tasks.add_task(calculate_nav_task, fund_id)
return BaseResponse(message='创建成功') return BaseResponse(message='创建成功')
......
from typing import Tuple from motor.core import AgnosticCollection
from exception.db import NotFundError from exception.db import NotFundError
from schema.fund import FundStatus from schema.fund import FundStatus
from tools.time_helper import utc_now_timestamp from tools.time_helper import utc_now_timestamp
async def query_fund_assets_and_nodes(fund_collect, fund_id, user_id=None, fund_status=None) -> Tuple[ async def query_fund_assets_and_nodes(fund_collect, fund_id, user_id=None, fund_status=None):
dict, dict, dict, dict, dict]:
query = {'id': fund_id} query = {'id': fund_id}
if user_id: if user_id:
query.update({"user_id": user_id}) query.update({"user_id": user_id})
...@@ -15,7 +13,7 @@ async def query_fund_assets_and_nodes(fund_collect, fund_id, user_id=None, fund_ ...@@ -15,7 +13,7 @@ async def query_fund_assets_and_nodes(fund_collect, fund_id, user_id=None, fund_
fund = await fund_collect.find_one(query) fund = await fund_collect.find_one(query)
if not fund: if not fund:
raise NotFundError() raise NotFundError()
return fund['assets'], fund['adjust_assets'], fund['pending_assets'], fund['staking_assets'], fund['nodes'] return fund
# 修改资产 # 修改资产
...@@ -45,3 +43,12 @@ async def update_fund(fund_collect, fund_id, *, assets=None, adjust_assets=None, ...@@ -45,3 +43,12 @@ async def update_fund(fund_collect, fund_id, *, assets=None, adjust_assets=None,
if nodes: if nodes:
update_data.update({'nodes': nodes}) update_data.update({'nodes': nodes})
await fund_collect.update_one(query, {"$set": update_data}) await fund_collect.update_one(query, {"$set": update_data})
async def get_nav_record(nav_collect: AgnosticCollection, fund_id: str, query_data: {}):
query = {"fund_id": fund_id}
if query_data:
query.update(query_data)
cursor = nav_collect.find(query)
nav_result = await cursor.to_list(length=None)
return nav_result
...@@ -54,12 +54,20 @@ async def calculate_nav(fund_id, calc_time: datetime.datetime = None, beach_serv ...@@ -54,12 +54,20 @@ async def calculate_nav(fund_id, calc_time: datetime.datetime = None, beach_serv
amount.setdefault(key, 0) amount.setdefault(key, 0)
amount[key] += value amount[key] += value
# 查询节点收益 # 查询节点收益, 如果是重新计算净值,则不查询收益
if update_fund:
node_income_assets = fund_data.get("node_income_assets", {})
for key, value in fund_data["nodes"].items(): for key, value in fund_data["nodes"].items():
if value["status"] == "active": if value["status"] == "active":
income = await beach_service.get_validator_income(key) income = beach_service.get_rewards(key)
amount.setdefault("ETH", 0) node_income_assets.setdefault("ETH", 0)
amount["ETH"] += income[0].performancetotal / (10 ** 9) if income else 0 node_income_assets["ETH"] += income.total_rewards.total if income else 0
fund_data["node_income_assets"] = node_income_assets
else:
pass
for key, value in fund_data["node_income_assets"].items():
amount.setdefault(key, 0)
amount[key] += value
# 节点质押资产 # 节点质押资产
for key, value in fund_data["staking_assets"].items(): for key, value in fund_data["staking_assets"].items():
...@@ -80,6 +88,7 @@ async def calculate_nav(fund_id, calc_time: datetime.datetime = None, beach_serv ...@@ -80,6 +88,7 @@ async def calculate_nav(fund_id, calc_time: datetime.datetime = None, beach_serv
error = f'缺少报价计算净值 [{lost_data}] [{"mongodb" if calc_time else "redis"}] [{calc_time}]' error = f'缺少报价计算净值 [{lost_data}] [{"mongodb" if calc_time else "redis"}] [{calc_time}]'
logger.error(error) logger.error(error)
raise Exception(error) raise Exception(error)
logger.info(f'[查询报价] | [{fund_id}] {price_data} | query_time:{calc_time - datetime.timedelta(hours=1)}')
nav = {} nav = {}
for symbol, volume in amount.items(): for symbol, volume in amount.items():
nav[symbol] = volume * price_data[symbol] nav[symbol] = volume * price_data[symbol]
......
...@@ -93,10 +93,12 @@ async def update_staking_node_status_task(beacon_service: BeaconChaService, mong ...@@ -93,10 +93,12 @@ async def update_staking_node_status_task(beacon_service: BeaconChaService, mong
if status == StakingBillStatus.finish: if status == StakingBillStatus.finish:
# 更新fund状态 # 更新fund状态
fund_collect = get_fund_collect(mongodb_manager) fund_collect = get_fund_collect(mongodb_manager)
assets, adjust_assets, pending_assets, staking_assets, nodes = await query_fund_assets_and_nodes( fund_data = await query_fund_assets_and_nodes(
fund_collect, fund_collect,
bill_item.fund_id, bill_item.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active
)
staking_assets, pending_assets, nodes = fund_data["staking_assets"], fund_data["pending_assets"], fund_data["nodes"]
staking_assets.setdefault(bill_item.currency, 0) staking_assets.setdefault(bill_item.currency, 0)
staking_assets[bill_item.currency] += bill_item.volume staking_assets[bill_item.currency] += bill_item.volume
pending_assets[bill_item.currency] -= bill_item.volume pending_assets[bill_item.currency] -= bill_item.volume
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment