Commit 76336ff9 authored by confusion's avatar confusion

1.添加账单删除接口

2.添加增加获取币种历史报价任务
parent f819bb2b
...@@ -292,3 +292,19 @@ async def query_bill( ...@@ -292,3 +292,19 @@ async def query_bill(
result = await cursor.to_list(length=None) result = await cursor.to_list(length=None)
response = PageResponse[Any](data=result, **page.dict(), total=len(result)) response = PageResponse[Any](data=result, **page.dict(), total=len(result))
return response return response
@router.delete('/{fund_id}/',
tags=["删除"],
response_model=Response,
summary='删除账单记录',
description='删除账单记录')
async def query_bill(
fund_id: str,
bill_type: BillType,
bill_collect: AgnosticCollection = Depends(get_bill_collect),
user: User = Depends(get_current_user)
):
query = {"fund_id": fund_id, "user_id": user.id, "bill_type": bill_type}
await bill_collect.delete_one(query)
return Response()
...@@ -12,18 +12,6 @@ from tools.jwt_tools import User ...@@ -12,18 +12,6 @@ from tools.jwt_tools import User
router = APIRouter() router = APIRouter()
# @router.post('/',
# response_model=BaseResponse,
# summary='创建节点刷新任务【测试】',
# description='创建节点刷新任务')
# async def create(
# scheduler: dependencies.AsyncIOScheduler = Depends(dependencies.get_schedular)
# ):
# scheduler.add_job(service.node.refresh_status, trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
# misfire_grace_time=10)
# return BaseResponse(data='创建成功')
@router.post('/', @router.post('/',
response_model=BaseResponse, response_model=BaseResponse,
summary='绑定节点', summary='绑定节点',
......
from typing import Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, APIRouter from fastapi import Depends, APIRouter
from motor.core import AgnosticCollection from motor.core import AgnosticCollection
from starlette.background import BackgroundTasks
from dependencies import get_current_user, get_scheduler, get_fund_collect from db import AioMongodbManager
from model import BaseResponse from dependencies import get_current_user, get_scheduler, get_fund_collect, get_mongodb_manager
from model import BaseResponse, Response, ErrorResponse
from service.price import CMCPrice
from service.scheduler import calculate_nav_task, delete_task from service.scheduler import calculate_nav_task, delete_task
from tools.jwt_tools import User from tools.jwt_tools import User
...@@ -33,3 +38,21 @@ async def stop( ...@@ -33,3 +38,21 @@ async def stop(
): ):
await delete_task(job_id, scheduler) await delete_task(job_id, scheduler)
return BaseResponse(message='停止成功') return BaseResponse(message='停止成功')
@router.post('/init_history/{symbol}/',
response_model=Response,
summary='创建币种历史报价获取任务',
description='创建币种历史报价获取任务')
async def create_init_ohlcv(
symbol: str,
background_tasks: BackgroundTasks,
mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)
):
cmc_price = CMCPrice(mongodb_manager)
try:
await cmc_price.filter_symbol_info(symbol)
except AssertionError:
return ErrorResponse(message=f'没有匹配到币种 [{symbol}]')
background_tasks.add_task(cmc_price.start_init_ohlcv, symbol=symbol)
return Response(message='创建成功')
...@@ -10,7 +10,7 @@ from configs import settings ...@@ -10,7 +10,7 @@ from configs import settings
from db import AioMongodbManager from db import AioMongodbManager
from dependencies import get_hour_price_collect from dependencies import get_hour_price_collect
from exception.http import RequestHttpException from exception.http import RequestHttpException
from tools.cache_helper import update_cache, delete_cache, get_many_cache, save_cache from tools.cache_helper import update_cache, delete_cache, get_many_cache, save_cache, get_cache
from tools.http_helper import aio_request from tools.http_helper import aio_request
from tools.time_helper import utc_now from tools.time_helper import utc_now
...@@ -70,13 +70,14 @@ class CMCPrice: ...@@ -70,13 +70,14 @@ class CMCPrice:
'create_time': utc_now(), 'create_time': utc_now(),
'name': 'base_coin'} for item in self.base_coins]) 'name': 'base_coin'} for item in self.base_coins])
def filter_symbol_info(self, symbol): async def filter_symbol_info(self, symbol):
await self.init()
symbol_info = self.cmc_map.get(symbol) symbol_info = self.cmc_map.get(symbol)
assert symbol_info, Exception(f'Not match [{symbol}] in {self.quick_search_url}') assert symbol_info, Exception(f'Not match [{symbol}] in {self.quick_search_url}')
return symbol_info return symbol_info
def query_added_date(self, symbol): async def query_added_date(self, symbol):
symbol_info = self.filter_symbol_info(symbol) symbol_info = await self.filter_symbol_info(symbol)
try: try:
return datetime.datetime.fromisoformat(symbol_info['first_historical_data'].split('.')[ return datetime.datetime.fromisoformat(symbol_info['first_historical_data'].split('.')[
0]).replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC) 0]).replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
...@@ -84,7 +85,14 @@ class CMCPrice: ...@@ -84,7 +85,14 @@ class CMCPrice:
logger.error(f'查询起始时间失败,使用默认时间 {symbol} {e}') logger.error(f'查询起始时间失败,使用默认时间 {symbol} {e}')
return datetime.datetime.fromisoformat('2023-01-01T00:00:00').replace(tzinfo=pytz.UTC) return datetime.datetime.fromisoformat('2023-01-01T00:00:00').replace(tzinfo=pytz.UTC)
async def delete_exist_data(self, symbol, filtered_data, hour_price_collect): async def update_task_status(self, symbol, next_start, status='normal'):
await update_cache(self.mongodb_manager,
name=f'hour_price_task_{symbol}',
data={"next_start": next_start, "update_time": utc_now(), "status": status},
upsert=True)
@staticmethod
async def delete_exist_data(symbol, filtered_data, hour_price_collect):
delete_time = set([]) delete_time = set([])
for item in filtered_data: for item in filtered_data:
delete_time.add(item['time']) delete_time.add(item['time'])
...@@ -93,47 +101,11 @@ class CMCPrice: ...@@ -93,47 +101,11 @@ class CMCPrice:
logger.error(f'[删除存在数据] {symbol} [{len(delete_time)}]') logger.error(f'[删除存在数据] {symbol} [{len(delete_time)}]')
@staticmethod @staticmethod
def get_end_time(): def get_end_time(offset=0):
return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC, microsecond=0, second=0, return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC, microsecond=0, second=0,
minute=0) - datetime.timedelta(hours=1) minute=0) - datetime.timedelta(hours=1 + offset)
async def save_history_data(self, symbol, start_time=None): async def ohlcv_request_core(self, symbol, start_time=None, count=5):
await self.init()
end_time = self.get_end_time()
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
start_time = start_time or self.query_added_date(symbol)
next_start = None
finish = False
while not finish:
try:
for_save_data = await self.query_ohlcv_from_api(symbol, start_time, count=self.per_request_limit)
except RequestHttpException:
logger.warning(f'[小时数据请求失败] [{symbol}] [{start_time}]')
return
# 过滤数据在开始和结束时间范围内
filtered_data = []
for item in for_save_data:
next_start = item['time']
if end_time >= item['time'] >= start_time:
filtered_data.append(item)
if next_start >= end_time:
finish = True
if not filtered_data:
break
# 删除存在数据
await self.delete_exist_data(symbol, filtered_data, hour_price_collect)
insert_res = await hour_price_collect.insert_many(filtered_data)
inserted = len(insert_res.inserted_ids)
logger.info(f'[小时数据写入成功] [{symbol}] [{inserted}]')
# 保存进度 用于重启后继续获取
await self.update_task_status(symbol, next_start)
start_time = next_start
logger.info(f'[小时数据完成] [{symbol}]')
await delete_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}')
async def query_ohlcv_from_api(self, symbol, start_time=None, count=5):
""" """
返回的数不包含开始时间 返回的数不包含开始时间
:param symbol: :param symbol:
...@@ -143,6 +115,7 @@ class CMCPrice: ...@@ -143,6 +115,7 @@ class CMCPrice:
""" """
await self.init() await self.init()
url = f'https://web-api.coinmarketcap.com/v2/cryptocurrency/ohlcv/historical?interval=1h&time_period=hourly&count={count}&symbol={symbol}&time_start={int(start_time.timestamp())}' url = f'https://web-api.coinmarketcap.com/v2/cryptocurrency/ohlcv/historical?interval=1h&time_period=hourly&count={count}&symbol={symbol}&time_start={int(start_time.timestamp())}'
error = ''
for i in range(5): for i in range(5):
for_save_data = [] for_save_data = []
try: try:
...@@ -163,24 +136,82 @@ class CMCPrice: ...@@ -163,24 +136,82 @@ class CMCPrice:
}) })
for_save_data.append(ohlcv_item) for_save_data.append(ohlcv_item)
return for_save_data return for_save_data
except: except Exception as e:
pass error = str(e)
raise RequestHttpException() raise RequestHttpException(message=f'[请求失败] [{url}] [{error}]')
async def update_task_status(self, symbol, next_start): async def task_core(self, symbol, this_start_time):
await update_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}', await self.init()
data={"next_start": next_start, "update_time": utc_now()}, try:
upsert=True) for_save_data = await self.ohlcv_request_core(symbol, this_start_time, count=self.per_request_limit)
except RequestHttpException as e:
raise Exception(f'[小时数据请求失败] [{symbol}] [{this_start_time}] [{e}]')
# 过滤数据在开始和结束时间范围内
next_start = None
end_time = self.get_end_time()
filtered_data = []
for item in for_save_data:
next_start = item['time']
if end_time >= item['time'] > this_start_time:
filtered_data.append(item)
if not filtered_data:
return None
# 删除存在数据
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
await self.delete_exist_data(symbol, filtered_data, hour_price_collect)
# 写入数据
insert_res = await hour_price_collect.insert_many(filtered_data)
inserted = len(insert_res.inserted_ids)
logger.info(
f'[小时数据写入成功] [{symbol}] [{inserted}] [{filtered_data[0]["time"]} ... {filtered_data[-1]["time"]}]')
if next_start >= end_time:
next_start = None
return next_start
async def history_task(self, symbol):
"""
根据数据库缓存信息 执行任务
"""
task_status = await get_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}')
if not task_status:
return
this_start_time = task_status['next_start']
try:
next_start = await self.task_core(symbol, this_start_time)
except Exception as e:
logger.error(e)
return await self.update_task_status(symbol, this_start_time, str(e))
if not next_start:
logger.info(f'[小时数据完成] [{symbol}]')
return await delete_cache(self.mongodb_manager, name=f'hour_price_task_{symbol}')
else:
# 保存进度 用于重启后继续获取
await self.update_task_status(symbol, next_start)
return await self.history_task(symbol)
async def start_init_ohlcv(self, symbol, resume=False):
await self.init()
if not resume:
start_time = await self.query_added_date(symbol)
# 新建进度
await self.update_task_status(symbol, start_time)
await self.history_task(symbol)
async def update_latest_data(self): async def start_update_ohlcv(self):
# 更新需要获取报价币种 # 更新需要获取报价币种
await self.load_base_coins() await self.load_base_coins()
for symbol in self.base_coins: for symbol in self.base_coins:
await self.save_history_data(symbol, start_time=self.get_end_time() - datetime.timedelta(hours=2)) start_time = self.get_end_time(offset=2)
# 新建进度
await self.update_task_status(symbol, start_time)
await self.history_task(symbol)
async def check_data(self, symbol, start_time=None): async def check_data(self, symbol, start_time=None):
await self.init() await self.init()
start_time = start_time or self.query_added_date(symbol) start_time = start_time or (await self.query_added_date(symbol))
end_time = self.get_end_time() end_time = self.get_end_time()
find_end_time = min([start_time + datetime.timedelta(hours=24 * 365), end_time]) find_end_time = min([start_time + datetime.timedelta(hours=24 * 365), end_time])
hour_price_collect = get_hour_price_collect(self.mongodb_manager) hour_price_collect = get_hour_price_collect(self.mongodb_manager)
...@@ -216,7 +247,7 @@ class CMCPrice: ...@@ -216,7 +247,7 @@ class CMCPrice:
async def update_scheduler_task(self): async def update_scheduler_task(self):
logger.info(f'更新报价定时任务开始执行 [{self.base_coins}]') logger.info(f'更新报价定时任务开始执行 [{self.base_coins}]')
await self.update_latest_data() await self.start_update_ohlcv()
async def get_price(coin_list, query_time, db_client): async def get_price(coin_list, query_time, db_client):
...@@ -249,6 +280,6 @@ if __name__ == '__main__': ...@@ -249,6 +280,6 @@ if __name__ == '__main__':
mongodb_manager = AioMongodbManager() mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund') mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
cmc_price = CMCPrice(mongodb_manager) cmc_price = CMCPrice(mongodb_manager)
# asyncio.run(cmc_price.update_latest_data()) asyncio.run(cmc_price.start_update_ohlcv())
# asyncio.run(cmc_price.save_history_data('BTC')) # asyncio.run(cmc_price.save_history_data('BTC'))
asyncio.run(cmc_price.check_data('BTC')) # asyncio.run(cmc_price.start_init_ohlcv('USDC', resume=False))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment