Commit a3dd11ac authored by 杨明橙's avatar 杨明橙

1.修改nodes为dict

2.修改更新 质押状态后 同时更新节点状态
parent 31674ffa
......@@ -9,7 +9,7 @@ from schema.bill import CreatePCFBill, PCFBillType, CreateExchangeBill, CreateAd
UpdatePCFBill, UpdateExchangeBill, UpdateStakingBill, UpdateAdjustBill, AllBillType
from schema.fund import FundStatus
from service.bill import update_bill
from service.fund import query_fund_assets, update_assets
from service.fund import query_fund_assets, update_fund
from tools.jwt_tools import User
router = APIRouter()
......@@ -26,23 +26,15 @@ async def create_pcf(
fund_collect: AgnosticCollection = Depends(get_fund_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect),
):
inc = create_pcf_bill.volume if create_pcf_bill.bill_type == PCFBillType.sub else -create_pcf_bill.volume
delta_volume = create_pcf_bill.volume if create_pcf_bill.bill_type == PCFBillType.sub else -create_pcf_bill.volume
assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect, create_pcf_bill.fund_id, user.id,
FundStatus.active)
assets.setdefault(create_pcf_bill.currency, 0)
# 如果是赎回 判断余额是否够
assert assets[create_pcf_bill.currency] + inc >= 0, "余额不足"
assets[create_pcf_bill.currency] += inc
# if item.currency in assets:
# inc = item.volume if item.bill_type == PCFBillType.sub else -item.volume
# assert assets[item.currency] + inc >= 0, "余额不足"
# assets[item.currency] += inc
# else:
# assert item.bill_type == PCFBillType.sub, "余额不足"
# assets.update({item.currency: item.volume})
await update_assets(fund_collect, create_pcf_bill.fund_id, assets=assets)
assert assets[create_pcf_bill.currency] + delta_volume >= 0, "余额不足"
assets[create_pcf_bill.currency] += delta_volume
await update_fund(fund_collect, create_pcf_bill.fund_id, assets=assets)
pcf = PCFBill(user_id=user.id, **create_pcf_bill.dict())
await bill_collect.insert_one(pcf.dict())
return Response[PCFBill](data=pcf.dict())
......@@ -66,33 +58,12 @@ async def create_exchange(
assets.setdefault(create_exchange_bill.output_currency, 0)
assets.setdefault(create_exchange_bill.input_currency, 0)
assert assets[
create_exchange_bill.output_currency] >= create_exchange_bill.output_volume, f"{create_exchange_bill.output_currency}余额不足"
assets[create_exchange_bill.output_currency] -= create_exchange_bill.output_volume
assets[create_exchange_bill.input_currency] += create_exchange_bill.input_volume
create_exchange_bill.input_currency] >= create_exchange_bill.input_volume, f"{create_exchange_bill.input_currency}余额不足"
await update_assets(fund_collect, create_exchange_bill.fund_id, assets=assets)
assets[create_exchange_bill.input_currency] -= create_exchange_bill.input_volume
assets[create_exchange_bill.output_currency] += create_exchange_bill.output_volume
# fund = await fund_collect.find_one({"id": item.fund_id, "user_id": user.id})
# assert fund, NotFundError()
# filter_asset = list(filter(lambda x: x["currency"] == item.input_currency, fund["assets"]))
# assert filter_asset, f"{item.input_currency}余额不足"
# input_asset = filter_asset[0]
# assert input_asset["volume"] >= item.input_volume, f"{item.input_currency}余额不足"
# update_input = UpdateOne(
# {"id": item.fund_id, "assets.currency": item.input_currency},
# {"$inc": {"assets.$.volume": -item.input_volume}}
# )
# output_filter = list(filter(lambda x: x["currency"] == item.output_currency, fund["assets"]))
# update_output = UpdateOne(
# {"id": item.fund_id, "assets.currency": item.output_currency},
# {"$inc": {"assets.$.volume": item.output_volume}}
# ) if output_filter else UpdateOne(
# {"id": item.fund_id},
# {"$push": {"assets": {"currency": item.output_currency, "volume": item.output_volume}}}
# )
# result = await fund_collect.bulk_write([update_input, update_output])
# logger.info(result.modified_count)
await update_fund(fund_collect, create_exchange_bill.fund_id, assets=assets)
input_value, output_value = create_exchange_bill.input_volume * create_exchange_bill.input_price, create_exchange_bill.output_volume * create_exchange_bill.output_price
exchange_bill = ExchangeBill(user_id=user.id, input_value=input_value, output_value=output_value,
......@@ -120,18 +91,7 @@ async def create_adjust(
adjust_assets[create_adjust_bill.currency] += create_adjust_bill.volume
adjust_assets['fund_share'] += create_adjust_bill.fund_share
# query = {"id": item.fund_id, "user_id": user.id}
# result = await fund_collect.update_one(
# {**query, "adjust_assets": {"$not": {"$elemMatch": {"currency": item.currency}}}},
# {"$push": {"adjust_assets": {"currency": item.currency, "volume": item.volume}}}
# )
# if result.modified_count == 0:
# inc_result = await fund_collect.update_one(
# {**query, "adjust_assets.currency": item.currency},
# {"$inc": {"adjust_assets.$.volume": item.volume}}
# )
# logger.info(f"inc_result={inc_result.modified_count}")
await update_assets(fund_collect, create_adjust_bill.fund_id, adjust_assets=adjust_assets)
await update_fund(fund_collect, create_adjust_bill.fund_id, adjust_assets=adjust_assets)
adjust_bill = AdjustBill(user_id=user.id, **create_adjust_bill.dict())
await bill_collect.insert_one(adjust_bill.dict())
response = Response[AdjustBill](data=adjust_bill.dict())
......@@ -158,7 +118,7 @@ async def create_staking_api(
# 防止增加的币种没有 设置默认值
pending_assets.setdefault(create_staking_bill.currency, 0)
pending_assets[create_staking_bill.currency] += create_staking_bill.volume
await update_assets(fund_collect, create_staking_bill.fund_id, pending_assets=pending_assets, assets=assets)
await update_fund(fund_collect, create_staking_bill.fund_id, pending_assets=pending_assets, assets=assets)
# 添加账目
staking_bill = StakingBill(user_id=user.id, **create_staking_bill.dict())
......
......@@ -60,11 +60,11 @@ async def delete_fund(
description='更新基金')
async def update(
fund_id: str,
update_fund: UpdateFund,
update_fund_data: UpdateFund,
user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect)
):
db_update_data = update_fund.dict(exclude_unset=True)
db_update_data = update_fund_data.dict(exclude_unset=True)
db_update_data.update({
"update_time": int(datetime.datetime.utcnow().timestamp())
})
......
from typing import Union, List
from typing import List
from motor.core import AgnosticCollection
from starlette.background import BackgroundTasks
import dependencies
from exception.api import APIError
from exception.db import ExistDataError, NotFundError
from model import BaseResponse, Response, PageResponse, Page
from fastapi import APIRouter, Depends
......@@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends
from model.fund import FundType
from model.node import BaseNode
from schema.beacon import Validator, ValidatorDeposit, ValidatorBlock, ValidatorIncome, Epoch
from schema.node import CreateNode
from schema.node import BindNode
from service.beacon import BeaconChaService
from tools.jwt_tools import User
......@@ -23,22 +23,25 @@ router = APIRouter()
summary='绑定节点',
description='绑定节点')
async def subscribe(
create_node: CreateNode,
bind_node: BindNode,
user: User = Depends(dependencies.get_current_user),
fund_collect: AgnosticCollection = Depends(dependencies.get_fund_collect),
beacon_service: BeaconChaService = Depends(BeaconChaService),
):
node_detail = await beacon_service.get_validator(index_or_pubkey=create_node.pub_key)
assert node_detail, "节点不存在,绑定失败"
db_data = BaseNode(**create_node.dict(), index=node_detail.validator_index)
node_detail = await beacon_service.get_validator(index_or_pubkey=bind_node.pub_key)
db_data = BaseNode(**bind_node.dict(), index=node_detail.validator_index)
# 限制staking基金才可绑定节点
query = {'id': create_node.fund_id, 'user_id': user.id, 'fund_type': FundType.staking}
query = {'id': bind_node.fund_id, 'user_id': user.id, 'fund_type': FundType.staking,
f"nodes.{bind_node.pub_key}": {'$exists': False}}
update = {"$set": {f"nodes.{bind_node.pub_key}": db_data.dict()}}
res = await fund_collect.update_one(
{**query, "nodes": {"$not": {"$elemMatch": {"pub_key": create_node.pub_key}}}},
{"$push": {"nodes": db_data.dict()}}
query,
update
)
if res.raw_result['nModified'] == 0:
raise ExistDataError(message='绑定失败,检查节点是否已存在')
raise ExistDataError(message='节点已存在')
return BaseResponse(data='绑定成功')
......@@ -49,13 +52,14 @@ async def unsubscribe(
user: User = Depends(dependencies.get_current_user),
fund_collect: AgnosticCollection = Depends(dependencies.get_fund_collect)
):
query = {'id': fund_id, 'user_id': user.id, 'fund_type': FundType.staking}
query = {'id': fund_id, 'user_id': user.id, 'fund_type': FundType.staking, f"nodes.{pub_key}": {"$exists": True}}
update = {"$unset": {f"nodes.{pub_key}": ""}}
res = await fund_collect.update_one(
{**query, "nodes": {"$elemMatch": {"pub_key": pub_key}}},
{"$pull": {"nodes": {"pub_key": pub_key}}}
query,
update
)
if res.raw_result['nModified'] == 0:
raise ExistDataError(message='解绑失败,检查节点是否存在')
raise APIError(message='解绑节点不存在')
return BaseResponse(data='解绑成功')
......@@ -67,14 +71,10 @@ async def get_node_info(
beacon_service: BeaconChaService = Depends(BeaconChaService),
fund_collect: AgnosticCollection = Depends(dependencies.get_fund_collect)
):
fund = await fund_collect.find_one({
"id": fund_id,
"user_id": user.id,
"nodes": {"$elemMatch": {"pub_key": pub_key}}
})
assert fund, NotFundError()
query = {'id': fund_id, 'user_id': user.id, 'fund_type': FundType.staking, f"nodes.{pub_key}": {"$exists": True}}
fund = await fund_collect.find_one(query)
assert fund, NotFundError('未绑定该节点')
validator_detail = await beacon_service.get_validator(pub_key)
assert validator_detail, NotFundError()
response = Response[Validator](data=validator_detail)
return response
......@@ -95,7 +95,7 @@ async def get_node_deposit(
"user_id": user.id,
"nodes": {"$elemMatch": {"pub_key": pub_key}}
})
assert fund, NotFundError()
assert fund, NotFundError('未绑定该节点')
validator_deposit_list = await beacon_service.get_validator_deposit(pub_key)
response = Response[List[ValidatorDeposit]](data=validator_deposit_list)
return response
......@@ -119,7 +119,7 @@ async def get_node_blocks(
"user_id": user.id,
"nodes": {"$elemMatch": {"pub_key": pub_key}}
})
assert fund, NotFundError()
assert fund, NotFundError('未绑定该节点')
node = next(filter(lambda x: x["pub_key"] == pub_key, fund["nodes"]))
validator_blocks = await beacon_service.get_validator_blocks(
index=node["index"],
......@@ -145,7 +145,7 @@ async def get_node_income(
"user_id": user.id,
"nodes": {"$elemMatch": {"pub_key": pub_key}}
})
assert fund, NotFundError()
assert fund, NotFundError('未绑定该节点')
validator_income_list = await beacon_service.get_validator_income(index_or_pubkey=pub_key)
assert validator_income_list, NotFundError()
response = Response[ValidatorIncome](data=validator_income_list[0])
......
......@@ -3,5 +3,5 @@ from starlette import status
class APIError(MyException):
status = status.HTTP_500_INTERNAL_SERVER_ERROR
status = status.HTTP_400_BAD_REQUEST
message = '接口错误'
......@@ -30,12 +30,12 @@ class ExchangeBill(MyBaseModel):
output_value: float = Field(..., description="输出价值")
profit: float = Field(0, description="置换产生的价差")
record_time: int = Field(default_factory=utc_now_timestamp, description='记录时间')
input_currency: str = Field(..., description="投入币种")
input_price: float = Field(..., description="投入币种价格")
input_volume: float = Field(..., description="投入数量")
output_currency: str = Field(..., description="输出币种")
output_price: float = Field(..., description="输出币种价格")
output_volume: float = Field(..., description="输出数量")
input_currency: str = Field(..., description="投入币种(花掉的币)")
input_price: float = Field(..., description="投入币种价格(花掉的币)")
input_volume: float = Field(..., description="投入数量(花掉的币)")
output_currency: str = Field(..., description="输出币种(买进的币)")
output_price: float = Field(..., description="输出币种价格(买进的币)")
output_volume: float = Field(..., description="输出数量(买进的币)")
remark: str = Field(default="", description="备注")
......
import datetime
from typing import List, Optional, Dict
from typing import Optional, Dict
from pydantic import Field
......@@ -37,7 +37,7 @@ class StakingFund(BaseFundItem):
user_id: str
user_email: str
nav: Optional[float] = Field(default=1, description='当前净值')
nodes: List[BaseNode]
nodes: Dict[str, BaseNode] = Field(default={}, description='绑定的节点')
assets: Dict[str, float] = Field(default={}, description='持仓')
adjust_assets: Dict[str, float] = Field(default={}, description='调整账户持仓')
pending_assets: Dict[str, float] = Field(default={}, description='pending资产')
......
import datetime
from enum import Enum
from typing import Optional
......@@ -63,12 +62,12 @@ class CreatePCFBill(BaseModel):
class CreateExchangeBill(BaseModel):
fund_id: str = Field(None, description='基金id')
input_currency: str = Field(..., description="投入币种")
input_price: float = Field(..., description="投入币种价格")
input_volume: float = Field(..., description="投入数量")
output_currency: str = Field(..., description="输出币种")
output_price: float = Field(..., description="输出币种价格")
output_volume: float = Field(..., description="输出数量")
input_currency: str = Field(..., description="投入币种(花掉的币)")
input_price: float = Field(..., description="投入币种价格(花掉的币)")
input_volume: float = Field(..., description="投入数量(花掉的币)")
output_currency: str = Field(..., description="输出币种(买进的币)")
output_price: float = Field(..., description="输出币种价格(买进的币)")
output_volume: float = Field(..., description="输出数量(买进的币)")
remark: str = Field(default="", description="备注")
record_time: int = Field(default_factory=lambda: utc_now_timestamp(), description='记录时间')
......
......@@ -11,7 +11,7 @@ class NodeStatus(str, Enum):
# 接口请求模型 创建
class CreateNode(BaseModel):
class BindNode(BaseModel):
pub_key: str = Field(..., description='绑定的key')
fund_id: str = Field(..., description='绑定基金的ID')
currency: str = Field(default='ETH', description='节点需要质押的币种')
......
......@@ -4,7 +4,9 @@ import time
from typing import List, Union
from pydantic import BaseModel, Field
from exception.http import RequestHttpException, RequestInvalidParamsError
from exception.db import NotFundError
from exception.http import RequestHttpException
from schema.beacon import Validator, ValidatorDeposit, ValidatorBlock, ValidatorIncome, Epoch
from tools.http_helper import aio_request
......@@ -59,7 +61,7 @@ class BeaconChaService:
async def service_request(self, url):
response = await aio_request(url=url, method="GET", headers=self.headers)
if 'ERROR: invalid validator-parameter' in response['status']:
raise RequestInvalidParamsError('错误的参数')
raise NotFundError("参数错误,节点不存在")
assert response["status"] == "OK", RequestHttpException('获取数据失败')
return response
......@@ -112,7 +114,7 @@ class BeaconChaService:
async def get_validator_income(self, index_or_pubkey: Union[int, str]) -> List[ValidatorIncome]:
"""获取质押收益"""
url = f"{self.base_url}/api/v1/validator/{index_or_pubkey}/performance"
data = await self.service_request(url)
response = await self.service_request(url)
return [ValidatorIncome(
balance=item["balance"] / self.eth_decimal,
performance1d=item["performance1d"] / self.eth_decimal,
......@@ -121,7 +123,7 @@ class BeaconChaService:
performance365d=item["performance365d"] / self.eth_decimal,
rank7d=item["rank7d"],
validatorindex=item["validatorindex"]
) for item in data["data"]]
) for item in response["data"]]
async def get_validator_blocks(self, index: int, start: int = 0, length: int = 10) -> ValidatorBlocks:
url = f"https://beaconcha.in/validator/{index}/proposedblocks?draw=1&columns%5B0%5D%5Bdata%5D=0&columns%5B0%5D%5Bname%5D=&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=1&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=true&columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=2&columns%5B2%5D%5Bname%5D=&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=3&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=true&columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=4&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=false&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=5&columns%5B5%5D%5Bname%5D=&columns%5B5%5D%5Bsearchable%5D=true&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=6&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=true&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=7&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B8%5D%5Bdata%5D=8&columns%5B8%5D%5Bname%5D=&columns%5B8%5D%5Bsearchable%5D=true&columns%5B8%5D%5Borderable%5D=true&columns%5B8%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B8%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B9%5D%5Bdata%5D=9&columns%5B9%5D%5Bname%5D=&columns%5B9%5D%5Bsearchable%5D=true&columns%5B9%5D%5Borderable%5D=true&columns%5B9%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B9%5D%5Bsearch%5D%5Bregex%5D=false&order%5B0%5D%5Bcolumn%5D=0&order%5B0%5D%5Bdir%5D=desc&start={start}&length={length}&search%5Bvalue%5D=&search%5Bregex%5D=false&_={time.time() * 1000}"
......
......@@ -18,8 +18,19 @@ async def query_fund_assets(fund_collect, fund_id, user_id=None, fund_status=Non
# 修改资产
async def update_assets(fund_collect, fund_id, *, assets=None, adjust_assets=None, pending_assets=None,
staking_assets=None):
async def update_fund(fund_collect, fund_id, *, assets=None, adjust_assets=None, pending_assets=None,
staking_assets=None, node=None):
"""
更新fund相关信息
:param fund_collect:
:param fund_id:
:param assets: {"ETH":2 }
:param adjust_assets: {"ETH":2 }
:param pending_assets: {"ETH":2 }
:param staking_assets: {"ETH":2 }
:param node: # 更新节点aaa 状态 { "nodes.aaa.status": NodeStatus.Finish }
:return:
"""
query = {'id': fund_id}
update_data = {"assets_update": utc_now_timestamp()}
if assets:
......@@ -30,4 +41,6 @@ async def update_assets(fund_collect, fund_id, *, assets=None, adjust_assets=Non
update_data.update({"pending_assets": pending_assets})
if staking_assets:
update_data.update({"staking_assets": staking_assets})
if node:
update_data.update(node)
await fund_collect.update_one(query, {"$set": update_data})
......@@ -3,7 +3,8 @@ from motor.core import AgnosticCollection
from exception.db import NotFundError
from model.bill import StakingBill
from schema.fund import FundStatus
from service.fund import query_fund_assets, update_assets
from schema.node import NodeStatus
from service.fund import query_fund_assets, update_fund
from service.nav import calculate_nav
from loguru import logger
from pymongo import ReturnDocument
......@@ -70,6 +71,7 @@ async def update_staking_bill_status_task(beacon_service: BeaconChaService, mong
{'$set': {"status": status, "update_time": utc_now_timestamp()}},
return_document=ReturnDocument.AFTER)
if status == StakingBillStatus.finish:
# 更新fund状态
fund_collect = get_fund_collect(mongodb_manager)
assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect,
bill_item.fund_id,
......@@ -77,7 +79,9 @@ async def update_staking_bill_status_task(beacon_service: BeaconChaService, mong
staking_assets.setdefault(bill_item.currency, 0)
staking_assets[bill_item.currency] += bill_item.volume
pending_assets[bill_item.currency] -= bill_item.volume
await update_assets(fund_collect, bill_item.fund_id, pending_assets=pending_assets, staking_assets=staking_assets)
await update_fund(fund_collect, bill_item.fund_id, pending_assets=pending_assets,
staking_assets=staking_assets,
node={f"nodes.{bill_item.pub_key}.status": NodeStatus.active})
logger.info(f"[更新质押账单状态任务] [已更新] [{bill_item.id}] {status}")
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment