Commit 5e4af2f5 authored by 杨明橙's avatar 杨明橙

修改创建 调整账目,置换币账目 接口

parent 08b35f7b
...@@ -30,14 +30,22 @@ async def create_pcf( ...@@ -30,14 +30,22 @@ async def create_pcf(
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
): ):
assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect, item.fund_id, user.id) inc = item.volume if item.bill_type == PCFBillType.sub else -item.volume
if item.currency in assets:
inc = item.volume if item.bill_type == PCFBillType.sub else -item.volume assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect, item.fund_id, user.id,
assert assets[item.currency] + inc >= 0, "余额不足" FundStatus.active)
assets[item.currency] += inc assets.setdefault(item.currency, 0)
else: # 如果是赎回 判断余额是否够
assert item.bill_type == PCFBillType.sub, "余额不足" assert assets[item.currency] + inc >= 0, "余额不足"
assets.update({item.currency: item.volume}) assets[item.currency] += inc
# if item.currency in assets:
# inc = item.volume if item.bill_type == PCFBillType.sub else -item.volume
# assert assets[item.currency] + inc >= 0, "余额不足"
# assets[item.currency] += inc
# else:
# assert item.bill_type == PCFBillType.sub, "余额不足"
# assets.update({item.currency: item.volume})
await update_assets(fund_collect, item.fund_id, assets=assets) await update_assets(fund_collect, item.fund_id, assets=assets)
pcf = PCFBill(user_id=user.id, **item.dict()) pcf = PCFBill(user_id=user.id, **item.dict())
await bill_collect.insert_one(pcf.dict()) await bill_collect.insert_one(pcf.dict())
...@@ -50,34 +58,49 @@ async def create_pcf( ...@@ -50,34 +58,49 @@ async def create_pcf(
summary='添加置换币账目', summary='添加置换币账目',
description='添加置换币账目') description='添加置换币账目')
async def create_exchange( async def create_exchange(
item: CreateExchangeBill, create_exchange_bill: CreateExchangeBill,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
): ):
fund = await fund_collect.find_one({"id": item.fund_id, "user_id": user.id}) assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect,
assert fund, NotFundError() create_exchange_bill.fund_id,
filter_asset = list(filter(lambda x: x["currency"] == item.input_currency, fund["assets"])) user.id,
assert filter_asset, f"{item.input_currency}余额不足" FundStatus.active)
input_asset = filter_asset[0] assets.setdefault(create_exchange_bill.output_currency, 0)
assert input_asset["volume"] >= item.input_volume, f"{item.input_currency}余额不足" assets.setdefault(create_exchange_bill.input_currency, 0)
update_input = UpdateOne( assert assets[
{"id": item.fund_id, "assets.currency": item.input_currency}, create_exchange_bill.output_currency] >= create_exchange_bill.output_volume, f"{create_exchange_bill.output_currency}余额不足"
{"$inc": {"assets.$.volume": -item.input_volume}}
) assets[create_exchange_bill.output_currency] -= create_exchange_bill.output_volume
output_filter = list(filter(lambda x: x["currency"] == item.output_currency, fund["assets"])) assets[create_exchange_bill.input_currency] += create_exchange_bill.input_volume
update_output = UpdateOne(
{"id": item.fund_id, "assets.currency": item.output_currency}, await update_assets(fund_collect, create_exchange_bill.fund_id, assets=assets)
{"$inc": {"assets.$.volume": item.output_volume}}
) if output_filter else UpdateOne( # fund = await fund_collect.find_one({"id": item.fund_id, "user_id": user.id})
{"id": item.fund_id}, # assert fund, NotFundError()
{"$push": {"assets": {"currency": item.output_currency, "volume": item.output_volume}}} # filter_asset = list(filter(lambda x: x["currency"] == item.input_currency, fund["assets"]))
) # assert filter_asset, f"{item.input_currency}余额不足"
result = await fund_collect.bulk_write([update_input, update_output]) # input_asset = filter_asset[0]
logger.info(result.modified_count) # assert input_asset["volume"] >= item.input_volume, f"{item.input_currency}余额不足"
input_value, output_value = item.input_volume * item.input_price, item.output_volume * item.output_price # update_input = UpdateOne(
# {"id": item.fund_id, "assets.currency": item.input_currency},
# {"$inc": {"assets.$.volume": -item.input_volume}}
# )
# output_filter = list(filter(lambda x: x["currency"] == item.output_currency, fund["assets"]))
# update_output = UpdateOne(
# {"id": item.fund_id, "assets.currency": item.output_currency},
# {"$inc": {"assets.$.volume": item.output_volume}}
# ) if output_filter else UpdateOne(
# {"id": item.fund_id},
# {"$push": {"assets": {"currency": item.output_currency, "volume": item.output_volume}}}
# )
# result = await fund_collect.bulk_write([update_input, update_output])
# logger.info(result.modified_count)
input_value, output_value = create_exchange_bill.input_volume * create_exchange_bill.input_price, create_exchange_bill.output_volume * create_exchange_bill.output_price
exchange_bill = ExchangeBill(user_id=user.id, input_value=input_value, output_value=output_value, exchange_bill = ExchangeBill(user_id=user.id, input_value=input_value, output_value=output_value,
profit=output_value - input_value, **item.dict()) profit=output_value - input_value, **create_exchange_bill.dict())
await bill_collect.insert_one(exchange_bill.dict()) await bill_collect.insert_one(exchange_bill.dict())
return Response[ExchangeBill](data=exchange_bill.dict()) return Response[ExchangeBill](data=exchange_bill.dict())
...@@ -88,23 +111,32 @@ async def create_exchange( ...@@ -88,23 +111,32 @@ async def create_exchange(
summary='添加调整账目', summary='添加调整账目',
description='添加调整账目') description='添加调整账目')
async def create_adjust( async def create_adjust(
item: CreateAdjustBill, create_adjust_bill: CreateAdjustBill,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect), fund_collect: AgnosticCollection = Depends(get_fund_collect),
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
): ):
query = {"id": item.fund_id, "user_id": user.id} assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect,
result = await fund_collect.update_one( create_adjust_bill.fund_id, user.id,
{**query, "adjust_assets": {"$not": {"$elemMatch": {"currency": item.currency}}}}, FundStatus.active)
{"$push": {"adjust_assets": {"currency": item.currency, "volume": item.volume}}} adjust_assets.setdefault(create_adjust_bill.currency, 0)
) adjust_assets.setdefault('fund_share', 0)
if result.modified_count == 0: adjust_assets[create_adjust_bill.currency] += create_adjust_bill.volume
inc_result = await fund_collect.update_one( adjust_assets['fund_share'] += create_adjust_bill.fund_share
{**query, "adjust_assets.currency": item.currency},
{"$inc": {"adjust_assets.$.volume": item.volume}} # query = {"id": item.fund_id, "user_id": user.id}
) # result = await fund_collect.update_one(
logger.info(f"inc_result={inc_result.modified_count}") # {**query, "adjust_assets": {"$not": {"$elemMatch": {"currency": item.currency}}}},
adjust_bill = AdjustBill(user_id=user.id, **item.dict()) # {"$push": {"adjust_assets": {"currency": item.currency, "volume": item.volume}}}
# )
# if result.modified_count == 0:
# inc_result = await fund_collect.update_one(
# {**query, "adjust_assets.currency": item.currency},
# {"$inc": {"adjust_assets.$.volume": item.volume}}
# )
# logger.info(f"inc_result={inc_result.modified_count}")
await update_assets(fund_collect, create_adjust_bill.fund_id, adjust_assets=adjust_assets)
adjust_bill = AdjustBill(user_id=user.id, **create_adjust_bill.dict())
await bill_collect.insert_one(adjust_bill.dict()) await bill_collect.insert_one(adjust_bill.dict())
response = Response[AdjustBill](data=adjust_bill.dict()) response = Response[AdjustBill](data=adjust_bill.dict())
return response return response
...@@ -121,18 +153,21 @@ async def create_staking_api( ...@@ -121,18 +153,21 @@ async def create_staking_api(
bill_collect: AgnosticCollection = Depends(get_bill_collect), bill_collect: AgnosticCollection = Depends(get_bill_collect),
fund_collect: AgnosticCollection = Depends(get_fund_collect) fund_collect: AgnosticCollection = Depends(get_fund_collect)
): ):
staking_bill = await create_staking(create_staking_bill, user.id, bill_collect, fund_collect)
assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect, assets, adjust_assets, pending_assets, staking_assets = await query_fund_assets(fund_collect,
user_id=user.id, user_id=user.id,
fund_id=staking_bill.fund_id, fund_id=create_staking_bill.fund_id,
fund_status=FundStatus.active) fund_status=FundStatus.active)
assert assets.get(create_staking_bill.currency, 0) >= create_staking_bill.volume, APIError(message='余额不足') assert assets.get(create_staking_bill.currency, 0) >= create_staking_bill.volume, '余额不足'
assets[create_staking_bill.currency] -= create_staking_bill.volume assets[create_staking_bill.currency] -= create_staking_bill.volume
# 防止增加的币种没有 设置默认值 # 防止增加的币种没有 设置默认值
pending_assets.setdefault(create_staking_bill.currency, 0) pending_assets.setdefault(create_staking_bill.currency, 0)
pending_assets[create_staking_bill.currency] += create_staking_bill.volume pending_assets[create_staking_bill.currency] += create_staking_bill.volume
await update_assets(fund_collect, create_staking_bill.fund_id, pending_assets=pending_assets, assets=assets) await update_assets(fund_collect, create_staking_bill.fund_id, pending_assets=pending_assets, assets=assets)
# 添加账目
staking_bill = StakingBill(user_id=user.id, **create_staking_bill.dict())
await bill_collect.insert_one(staking_bill.dict())
response = Response[StakingBill](data=staking_bill.dict()) response = Response[StakingBill](data=staking_bill.dict())
return response return response
......
...@@ -94,8 +94,8 @@ async def startup(): ...@@ -94,8 +94,8 @@ async def startup():
misfire_grace_time=20 misfire_grace_time=20
) )
if settings.env == 'LOCAL': # if settings.env == 'LOCAL':
return # return
app.state.scheduler.start() app.state.scheduler.start()
app.state.scheduler.print_jobs() app.state.scheduler.print_jobs()
......
...@@ -6,6 +6,7 @@ from pydantic import Field ...@@ -6,6 +6,7 @@ from pydantic import Field
from model import MyBaseModel from model import MyBaseModel
from model.node import BaseNode from model.node import BaseNode
from schema.fund import FundType, FundStatus from schema.fund import FundType, FundStatus
from tools.time_helper import utc_now_timestamp
class BaseFundItem(MyBaseModel): class BaseFundItem(MyBaseModel):
...@@ -29,6 +30,7 @@ class NormalFund(BaseFundItem): ...@@ -29,6 +30,7 @@ class NormalFund(BaseFundItem):
adjust_assets: Dict[str, float] = Field(default={}, description='调整账户持仓') adjust_assets: Dict[str, float] = Field(default={}, description='调整账户持仓')
pending_assets: Dict[str, float] = Field(default={}, description='pending资产') pending_assets: Dict[str, float] = Field(default={}, description='pending资产')
staking_assets: Dict[str, float] = Field(default={}, description='质押资产') staking_assets: Dict[str, float] = Field(default={}, description='质押资产')
assets_update: float = Field(default_factory=utc_now_timestamp, description='余额更新时间')
class StakingFund(BaseFundItem): class StakingFund(BaseFundItem):
...@@ -40,3 +42,4 @@ class StakingFund(BaseFundItem): ...@@ -40,3 +42,4 @@ class StakingFund(BaseFundItem):
adjust_assets: Dict[str, float] = Field(default={}, description='调整账户持仓') adjust_assets: Dict[str, float] = Field(default={}, description='调整账户持仓')
pending_assets: Dict[str, float] = Field(default={}, description='pending资产') pending_assets: Dict[str, float] = Field(default={}, description='pending资产')
staking_assets: Dict[str, float] = Field(default={}, description='质押资产') staking_assets: Dict[str, float] = Field(default={}, description='质押资产')
assets_update: float = Field(default_factory=utc_now_timestamp, description='余额更新时间')
...@@ -2,6 +2,7 @@ from typing import Tuple ...@@ -2,6 +2,7 @@ from typing import Tuple
from exception.db import NotFundError from exception.db import NotFundError
from schema.fund import FundStatus from schema.fund import FundStatus
from tools.time_helper import utc_now_timestamp
async def query_fund_assets(fund_collect, fund_id, user_id=None, fund_status=None) -> Tuple[dict, dict, dict, dict]: async def query_fund_assets(fund_collect, fund_id, user_id=None, fund_status=None) -> Tuple[dict, dict, dict, dict]:
...@@ -17,10 +18,10 @@ async def query_fund_assets(fund_collect, fund_id, user_id=None, fund_status=Non ...@@ -17,10 +18,10 @@ async def query_fund_assets(fund_collect, fund_id, user_id=None, fund_status=Non
# 修改资产 # 修改资产
async def update_assets(fund_collect, fund_id, assets=None, adjust_assets=None, pending_assets=None, async def update_assets(fund_collect, fund_id, *, assets=None, adjust_assets=None, pending_assets=None,
staking_assets=None): staking_assets=None):
query = {'id': fund_id} query = {'id': fund_id}
update_data = {} update_data = {"assets_update": utc_now_timestamp()}
if assets: if assets:
update_data.update({"assets": assets}) update_data.update({"assets": assets})
if adjust_assets: if adjust_assets:
......
...@@ -13,7 +13,7 @@ from dependencies import get_bill_collect, get_fund_collect ...@@ -13,7 +13,7 @@ from dependencies import get_bill_collect, get_fund_collect
from exception.http import RequestInvalidParamsError from exception.http import RequestInvalidParamsError
from schema.bill import StakingBillStatus, AllBillType from schema.bill import StakingBillStatus, AllBillType
from service.beacon import BeaconChaService from service.beacon import BeaconChaService
from tools.time_helper import utc_now from tools.time_helper import utc_now_timestamp
async def delete_task(job_id, scheduler): async def delete_task(job_id, scheduler):
...@@ -67,7 +67,7 @@ async def update_staking_bill_status_task(beacon_service: BeaconChaService, mong ...@@ -67,7 +67,7 @@ async def update_staking_bill_status_task(beacon_service: BeaconChaService, mong
if status: if status:
await bill_collect.find_one_and_update( await bill_collect.find_one_and_update(
{'id': bill_item.id}, {'id': bill_item.id},
{'$set': {"status": status, "update_time": utc_now()}}, {'$set': {"status": status, "update_time": utc_now_timestamp()}},
return_document=ReturnDocument.AFTER) return_document=ReturnDocument.AFTER)
if status == StakingBillStatus.finish: if status == StakingBillStatus.finish:
fund_collect = get_fund_collect(mongodb_manager) fund_collect = get_fund_collect(mongodb_manager)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment