Commit 55cc38bf authored by Confusion-ymc's avatar Confusion-ymc

修改定时任务

parent 8391a5f1
from fastapi import APIRouter from fastapi import APIRouter
from api import bill, nav, fund, group from api import bill, nav, fund, group, node
api_router = APIRouter() api_router = APIRouter()
api_router.include_router(bill.router, prefix="/bill", tags=["账单"]) api_router.include_router(bill.router, prefix="/bill", tags=["账单"])
api_router.include_router(nav.router, prefix="/nav", tags=["净值"]) api_router.include_router(nav.router, prefix="/nav", tags=["净值"])
api_router.include_router(fund.router, prefix="/fund", tags=["基金"]) api_router.include_router(fund.router, prefix="/fund", tags=["基金"])
api_router.include_router(node.router, prefix="/node", tags=["节点"])
api_router.include_router(group.router, prefix="/group", tags=["用户分组"]) api_router.include_router(group.router, prefix="/group", tags=["用户分组"])
import pytz
from apscheduler.triggers import interval
import dependencies
import service.node
from model import BaseResponse
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post('/',
response_model=BaseResponse,
summary='创建节点刷新任务【测试】',
description='创建节点刷新任务')
async def create(
schedular: dependencies.AsyncIOScheduler = Depends(dependencies.get_schedular)
):
schedular.add_job(service.node.refresh_status, trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
misfire_grace_time=10)
return BaseResponse(data='创建成功')
from typing import Dict from typing import Dict
from urllib.parse import quote_plus
import pytz import pytz
from bson import CodecOptions from bson import CodecOptions
...@@ -34,6 +33,3 @@ def register_mongodb(app): ...@@ -34,6 +33,3 @@ def register_mongodb(app):
mongodb_manger = AioMongodbManager() mongodb_manger = AioMongodbManager()
mongodb_manger.setup_pool(settings.mongodb, 'pyfund') mongodb_manger.setup_pool(settings.mongodb, 'pyfund')
app.state.mongodb_manger = mongodb_manger app.state.mongodb_manger = mongodb_manger
from urllib.parse import quote
quote('mongodb://root:ETHQig66tzxoZc+wuIPEUTMVsY@13.115.26.128:27018')
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Security from fastapi import Security
from fastapi.security import HTTPAuthorizationCredentials from fastapi.security import HTTPAuthorizationCredentials
...@@ -15,3 +16,7 @@ def get_current_user(credentials: HTTPAuthorizationCredentials = Security(jwt_to ...@@ -15,3 +16,7 @@ def get_current_user(credentials: HTTPAuthorizationCredentials = Security(jwt_to
def get_mongodb_manager(request: Request) -> AioMongodbManager: def get_mongodb_manager(request: Request) -> AioMongodbManager:
return request.app.state.mongodb_manger return request.app.state.mongodb_manger
def get_schedular(request: Request) -> AsyncIOScheduler:
return request.app.state.schedular
...@@ -14,6 +14,7 @@ from db.mongodb_helper import register_mongodb ...@@ -14,6 +14,7 @@ from db.mongodb_helper import register_mongodb
from exception import MyException from exception import MyException
from model import ErrorResponse from model import ErrorResponse
from tools.jwt_tools import get_identify_key from tools.jwt_tools import get_identify_key
from tools.scheduler import create_schedular
if settings.env != 'LOCAL': if settings.env != 'LOCAL':
openapi_prefix = '/coinsdataapiv2' openapi_prefix = '/coinsdataapiv2'
...@@ -59,6 +60,10 @@ async def startup(): ...@@ -59,6 +60,10 @@ async def startup():
# 添加路由 # 添加路由
app.include_router(api_router) app.include_router(api_router)
# 添加定时任务
app.state.schedular = create_schedular(settings.mongodb)
app.state.schedular.start()
if __name__ == '__main__': if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000) uvicorn.run('main:app', host='0.0.0.0', port=8000)
from tools.http_helper import aio_request
async def refresh_status():
res = await aio_request(url='https://www.baidu.com', json_res=False)
print(res)
\ No newline at end of file
...@@ -6,13 +6,14 @@ from loguru import logger ...@@ -6,13 +6,14 @@ from loguru import logger
from exception.http import RequestHttpException from exception.http import RequestHttpException
async def aio_request(url, method='GET', **kwargs): async def aio_request(url, method='GET', json_res=True, **kwargs):
try: try:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
method = method.upper() method = method.upper()
response = await client.request(method=method, url=url, **kwargs) response = await client.request(method=method, url=url, **kwargs)
content = response.content res = response.content
res = json.loads(content) if json_res:
res = json.loads(res)
logger.info(f'请求成功 [{method}] [{url}]') logger.info(f'请求成功 [{method}] [{url}]')
return res return res
except Exception as e: except Exception as e:
......
import asyncio import asyncio
import datetime
import time
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED, JobEvent, JobExecutionEvent
from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger from loguru import logger
from pymongo import MongoClient from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/') from configs import settings
jobstore = MongoDBJobStore(client=client, database='scheduler', collection='jobs')
scheduler = AsyncIOScheduler(jobstores={'mongo': jobstore})
async def job_function():
await asyncio.sleep(3)
print('Hello World')
async def blocking_function():
while True:
print('Blocking')
await asyncio.sleep(2)
def add_job(job_id, func, trigger=None, args=None, kwargs=None):
"""添加job"""
print(f"添加job - {job_id}")
scheduler.add_job(id=job_id, func=func, trigger=trigger, args=args, kwargs=kwargs, jobstore='mongo')
def remove_job(job_id):
"""移除job"""
scheduler.remove_job(job_id)
print(f"移除job - {job_id}")
def create_schedular(store_mongodb_url) -> AsyncIOScheduler:
def job_listener(event: JobExecutionEvent):
job = scheduler.get_job(event.job_id)
if event.code == EVENT_JOB_MISSED:
logger.error(
f"[Scheduler][Error][错过执行时间] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}|当前时间:{datetime.datetime.utcnow()}")
elif not event.exception:
logger.info(
f"[Scheduler][Success] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}")
else:
logger.error(
f"[Scheduler][Error] {job.name}|{job.trigger}|设定执行时间:{event.scheduled_run_time}|{event.exception}|{event.traceback}")
def pause_job(job_id): scheduler: AsyncIOScheduler = AsyncIOScheduler(
"""停止job""" jobstores={
scheduler.pause_job(job_id) 'default': MongoDBJobStore(client=MongoClient(store_mongodb_url), database='scheduler', collection='jobs')},
print(f"停止job - {job_id}") max_workers=50
)
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
return scheduler
def resume_job(job_id): def task_job():
"""恢复job""" print(time.time())
scheduler.resume_job(job_id)
print(f"恢复job - {job_id}")
def get_jobs(): async def test():
"""获取所有job信息,包括已停止的""" scheduler = create_schedular(settings.mongodb)
res = scheduler.get_jobs()
print(f"所有job - {res}")
def print_jobs():
print(f"详细job信息")
scheduler.print_jobs()
def start():
"""启动调度器"""
scheduler.start() scheduler.start()
scheduler.remove_all_jobs()
scheduler.print_jobs()
def shutdown(): # scheduler.add_job(func=task_job, trigger=interval.IntervalTrigger(seconds=5, timezone=pytz.UTC),
"""关闭调度器""" # misfire_grace_time=10)
scheduler.shutdown() while True:
await asyncio.sleep(1)
if __name__ == '__main__': if __name__ == '__main__':
# scheduler.add_job(job_function, trigger=IntervalTrigger(seconds=5), id='my_job', jobstore='mongo') asyncio.run(test())
scheduler.start()
loop = asyncio.get_event_loop()
task = loop.create_task(blocking_function())
loop.run_until_complete(asyncio.wait([task]))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment