Commit 9f58969b authored by 陈涛's avatar 陈涛

创建基金自动添加计算净值定时任务

parent 9904f5dd
import datetime import datetime
from typing import Union, Optional from typing import Union, Optional
import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query, Request
from motor.core import AgnosticCollection from motor.core import AgnosticCollection
from pymongo import ReturnDocument from pymongo import ReturnDocument
from starlette.background import BackgroundTasks
from exception.db import NotFundError from exception.db import NotFundError
from model import Response, PageResponse, Page from model import Response, PageResponse, Page
from model.fund import FundType, StakingFund, NormalFund, FundStatus from model.fund import FundType, StakingFund, NormalFund, FundStatus
from dependencies import get_current_user, get_fund_collect, get_scheduler from dependencies import get_current_user, get_fund_collect, get_scheduler
from schema.fund import CreateFund, UpdateFund from schema.fund import CreateFund, UpdateFund
from service.scheduler import delete_nav_task from service.scheduler import delete_nav_task, calculate_nav_task, get_next_execute_time
from tools.jwt_tools import User from tools.jwt_tools import User
router = APIRouter() router = APIRouter()
...@@ -26,7 +28,9 @@ fund_type_map = { ...@@ -26,7 +28,9 @@ fund_type_map = {
summary='创建基金', summary='创建基金',
description='创建基金') description='创建基金')
async def create( async def create(
request: Request,
create_fund: CreateFund, create_fund: CreateFund,
background_tasks: BackgroundTasks,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect) fund_collect: AgnosticCollection = Depends(get_fund_collect)
): ):
...@@ -37,7 +41,20 @@ async def create( ...@@ -37,7 +41,20 @@ async def create(
response_model = fund_type_map[data['fund_type']] response_model = fund_type_map[data['fund_type']]
await fund_collect.insert_one(data) await fund_collect.insert_one(data)
# await calculate_nav_task(data['id'], scheduler, fund_collect, user.id) background_tasks.add_task(calculate_nav_task, data["id"])
scheduler = request.app.state.scheduler
job_id = f"calculate_nav_{data['id']}"
time_obj = datetime.datetime.strptime(data["settlement_time"], "%H:%M")
scheduler.add_job(
calculate_nav_task,
trigger="cron",
timezone=pytz.UTC,
hour=time_obj.hour,
minutes=time_obj.minute,
args=[fund_collect, data["id"]],
id=job_id,
misfire_grace_time=60*60
)
return Response[response_model](data=response_model(**data)) return Response[response_model](data=response_model(**data))
......
...@@ -4,18 +4,10 @@ from motor.core import AgnosticCollection ...@@ -4,18 +4,10 @@ from motor.core import AgnosticCollection
from dependencies import get_nav_collect from dependencies import get_nav_collect
from model import SortParams, FilterTime, Page, PageResponse from model import SortParams, FilterTime, Page, PageResponse
from model.fund import StakingFundNav from model.fund import StakingFundNav
from service.nav import calculate_nav
router = APIRouter() router = APIRouter()
@router.get("/execute/{fund_id}/")
async def test_nav(fund_id):
response = await calculate_nav(fund_id)
return response
@router.get("/{fund_id}/", response_model=PageResponse[StakingFundNav], summary="查询净值记录") @router.get("/{fund_id}/", response_model=PageResponse[StakingFundNav], summary="查询净值记录")
async def nav( async def nav(
fund_id: str, fund_id: str,
......
from apscheduler.schedulers.asyncio import AsyncIOScheduler from datetime import datetime
from fastapi import Depends, APIRouter
from motor.core import AgnosticCollection
import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, APIRouter, Request
from db import AioMongodbManager from db import AioMongodbManager
from dependencies import get_current_user, get_scheduler, get_fund_collect, get_mongodb_manager from dependencies import get_current_user, get_scheduler, get_fund_collect, get_mongodb_manager
from model import BaseResponse, Response, ErrorResponse from model import BaseResponse, Response, ErrorResponse
...@@ -17,12 +18,24 @@ router = APIRouter() ...@@ -17,12 +18,24 @@ router = APIRouter()
summary='创建基金净值计算任务', summary='创建基金净值计算任务',
description='创建基金净值计算任务') description='创建基金净值计算任务')
async def create( async def create(
request: Request,
fund_id: str, fund_id: str,
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
user: User = Depends(get_current_user), user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect),
): ):
background_tasks.add_task(calculate_nav_task, fund_collect, fund_id) scheduler = request.app.state.scheduler
job_id = f"calculate_nav_{fund_id}"
time_obj = datetime.strptime("08:00", "%H:%M")
scheduler.add_job(
calculate_nav_task,
trigger="cron",
timezone=pytz.UTC,
hour=time_obj.hour,
minute=time_obj.minute,
args=[fund_id],
id=job_id
)
# background_tasks.add_task(calculate_nav_task, fund_id)
return BaseResponse(message='创建成功') return BaseResponse(message='创建成功')
......
from motor.core import AgnosticCollection import datetime
import pytz
from exception.db import NotFundError from exception.db import NotFundError
from model.bill import StakingBill from model.bill import StakingBill
...@@ -17,6 +19,25 @@ from service.beacon import BeaconChaService ...@@ -17,6 +19,25 @@ from service.beacon import BeaconChaService
from tools.time_helper import utc_now_timestamp from tools.time_helper import utc_now_timestamp
def get_next_execute_time(time_str):
# 将字符串转换为时间对象
time_obj = datetime.datetime.strptime(time_str, '%H:%M')
# 获取当前时间和 UTC 时间
now = datetime.datetime.now(pytz.utc)
utc = pytz.utc
# 如果当前时间大于输入时间,则计算下一天的输入时间
if now.time() > time_obj.time():
next_day = now + datetime.timedelta(days=1)
next_time = next_day.replace(hour=time_obj.hour, minute=time_obj.minute)
else:
next_time = now.replace(hour=time_obj.hour, minute=time_obj.minute)
# 将下一次运行时间转换为 UTC 时间
next_time_utc = utc.localize(next_time)
return next_time_utc
async def delete_task(job_id, scheduler): async def delete_task(job_id, scheduler):
if scheduler.get_job(job_id): if scheduler.get_job(job_id):
scheduler.remove_job(job_id) scheduler.remove_job(job_id)
...@@ -30,13 +51,14 @@ async def delete_nav_task(fund_id, scheduler): ...@@ -30,13 +51,14 @@ async def delete_nav_task(fund_id, scheduler):
await delete_task(job_id, scheduler) await delete_task(job_id, scheduler)
async def calculate_nav_task(fund_collect: AgnosticCollection, fund_id=None): async def calculate_nav_task(fund_id=None):
""" """
创建净值计算任务 创建净值计算任务
:param fund_id: :param fund_id:
:param fund_collect:
:return: :return:
""" """
from main import app
fund_collect = get_fund_collect(app.state.mongodb_manager)
if fund_id: if fund_id:
await calculate_nav(fund_id) await calculate_nav(fund_id)
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment