Commit cfb90ebf authored by Confusion-ymc's avatar Confusion-ymc

优化定时任务

parent a7f47c5f
from fastapi import APIRouter
from api import bill, nav, fund, group, node
from api import bill, nav, fund, group, node, scheduler
api_router = APIRouter()
......@@ -8,3 +8,4 @@ api_router.include_router(nav.router, prefix="/nav", tags=["净值"])
api_router.include_router(fund.router, prefix="/fund", tags=["基金"])
api_router.include_router(node.router, prefix="/node", tags=["节点"])
api_router.include_router(group.router, prefix="/group", tags=["用户分组"])
api_router.include_router(scheduler.router, prefix="/scheduler", tags=["定时任务"])
......@@ -2,9 +2,7 @@ import datetime
from typing import Union
from redis import asyncio as aioredis
import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers import interval
from fastapi import APIRouter, Depends
from motor.core import AgnosticCollection
from pymongo import ReturnDocument
......@@ -12,8 +10,8 @@ from pymongo import ReturnDocument
from exception.db import NotFundError
from model import Response
from model.fund import FundType, CreateFund, StakingFund, NormalFund, UpdateFund
from dependencies import get_current_user, get_fund_collect, get_scheduler, get_cmc_price_redis
from service.nav import calculate_nav
from dependencies import get_current_user, get_fund_collect, get_scheduler
from service.scheduler import calculate_nav_task
from tools.jwt_tools import User
router = APIRouter()
......@@ -27,7 +25,6 @@ async def create(
create_fund: CreateFund,
user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect),
redis_client: aioredis.Redis = Depends(get_cmc_price_redis),
scheduler: AsyncIOScheduler = Depends(get_scheduler)
):
if create_fund.fund_type == FundType.staking:
......@@ -42,10 +39,7 @@ async def create(
await fund_collect.insert_one(insert_data)
scheduler.add_job(func=calculate_nav, args=(create_model.id,),
trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
misfire_grace_time=10)
scheduler.print_jobs()
await calculate_nav_task(create_model.id, scheduler, fund_collect, user.id)
return response
......
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, APIRouter
from motor.core import AgnosticCollection
from dependencies import get_current_user, get_scheduler, get_fund_collect
from model import BaseResponse
from service.scheduler import calculate_nav_task, delete_task
from tools.jwt_tools import User
router = APIRouter()
@router.post('/',
summary='创建基金净值计算任务',
description='创建基金净值计算任务')
async def create(
fund_id: str,
user: User = Depends(get_current_user),
fund_collect: AgnosticCollection = Depends(get_fund_collect),
scheduler: AsyncIOScheduler = Depends(get_scheduler)
):
await calculate_nav_task(fund_id, scheduler, fund_collect, user.id)
return BaseResponse(message='创建成功')
@router.delete('/',
summary='停止定时任务',
description='停止定时任务')
async def stop(
job_id: str,
user: User = Depends(get_current_user),
scheduler: AsyncIOScheduler = Depends(get_scheduler)
):
await delete_task(job_id, scheduler)
return BaseResponse(message='停止成功')
from starlette import status
from exception import MyException
class TaskExistError(MyException):
status = status.HTTP_500_INTERNAL_SERVER_ERROR
message = '任务已存在'
from loguru import logger
from redis import asyncio as aioredis
from motor.core import AgnosticCollection
from dependencies import get_fund_collect, get_cmc_price_redis
from model.fund import FundStatus
......@@ -15,3 +13,4 @@ async def calculate_nav(fund_id):
for coin_item in fund_data['assets']:
data = await redis_client.hget(f'CMC:CoinPrice:{coin_item["currency"]}', 'Price')
print(f'获取到报价 {data}')
# todo 未完成具体实现
import pytz
from apscheduler.triggers import interval
from motor.core import AgnosticCollection
from exception.db import NotFundError
from exception.schecular import TaskExistError
from service.nav import calculate_nav
async def delete_task(job_id, schedular):
if schedular.get_job(job_id):
schedular.remove_job(job_id)
else:
raise NotFundError()
schedular.print_jobs()
async def calculate_nav_task(fund_id, schedular, fund_collect: AgnosticCollection, user_id=None):
"""
创建净值计算任务
:param schedular:
:param fund_id:
:param fund_collect:
:param user_id:
:return:
"""
query = {'id': fund_id}
if user_id:
query.update({'user_id': user_id})
res = await fund_collect.find_one()
assert res, NotFundError()
job_id = f'calculate_nav_{fund_id}'
if schedular.get_job(job_id):
raise TaskExistError
schedular.add_job(func=calculate_nav, id=job_id, args=(fund_id,),
trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
misfire_grace_time=10)
schedular.print_jobs()
import asyncio
import datetime
import time
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED, JobEvent, JobExecutionEvent
from apscheduler import events
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED, JobEvent, JobExecutionEvent, \
EVENT_JOB_ADDED, EVENT_JOB_SUBMITTED, EVENT_JOB_REMOVED, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_MODIFIED
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
......@@ -10,27 +11,45 @@ from pymongo import MongoClient
from configs import settings
event_map = {
2 ** 0: "EVENT_SCHEDULER_STARTED",
2 ** 0: "EVENT_SCHEDULER_START",
2 ** 1: "EVENT_SCHEDULER_SHUTDOWN",
2 ** 2: "EVENT_SCHEDULER_PAUSED",
2 ** 3: "EVENT_SCHEDULER_RESUMED",
2 ** 4: "EVENT_EXECUTOR_ADDED",
2 ** 5: "EVENT_EXECUTOR_REMOVED",
2 ** 6: "EVENT_JOBSTORE_ADDED",
2 ** 7: "EVENT_JOBSTORE_REMOVED",
2 ** 8: "EVENT_ALL_JOBS_REMOVED",
2 ** 9: "EVENT_JOB_ADDED",
2 ** 10: "EVENT_JOB_REMOVED",
2 ** 11: "EVENT_JOB_MODIFIED",
2 ** 12: "EVENT_JOB_EXECUTED",
2 ** 13: "EVENT_JOB_ERROR",
2 ** 14: "EVENT_JOB_MISSED",
2 ** 15: "EVENT_JOB_SUBMITTED",
2 ** 16: "EVENT_JOB_MAX_INSTANCES"
}
def create_scheduler(store_mongodb_url) -> AsyncIOScheduler:
def job_listener(event: JobExecutionEvent):
job = scheduler.get_job(event.job_id)
if event.code == EVENT_JOB_MISSED:
if getattr(event, "exception", None):
logger.error(
f"[Scheduler][Error][错过执行时间] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}|当前时间:{datetime.datetime.utcnow()}")
elif not event.exception:
print(event)
logger.info(
f"[Scheduler][Success] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}")
f"[Scheduler] [{event.job_id}] [{event_map.get(event.code, event.code)}]|{event.exception}|{event.traceback}")
else:
logger.error(
f"[Scheduler][Error] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}|{event.exception}|{event.traceback}")
logger.info(
f'[Scheduler] [{event.job_id}] [{event_map.get(event.code, event.code)}]')
scheduler: AsyncIOScheduler = AsyncIOScheduler(
jobstores={
'default': MongoDBJobStore(client=MongoClient(store_mongodb_url), database='scheduler', collection='jobs')},
max_workers=50
)
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
scheduler.add_listener(job_listener,
mask=(EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED |
EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | EVENT_JOB_MAX_INSTANCES))
return scheduler
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment