init

parents
.idea/
__pycache__
*.pyc
venv/
*.log
*.DS_Store
FROM python:3.11-slim-bullseye
WORKDIR /code/
RUN pip config set global.index-url https://pypi.mirrors.ustc.edu.cn/simple/
RUN python -m pip install --upgrade pip
ADD requirements.txt /code/
RUN pip3 install -r requirements.txt
ADD . /code
CMD ["python3", "-u", "main.py"]
\ No newline at end of file
# 微信公众号对接chatGPT
## 安装运行
### 安装依赖
1.配置中科大加速源 (推荐)
```shell
pip3 config set global.index-url https://pypi.mirrors.ustc.edu.cn/simple/
```
2.安装库
```shell
pip3 install -r requirements.txt
```
### <span id="setings">设置</span>
#### 1.创建配置文件
```
1.复制一份 config.sample.py
2.重名为 config.py
```
#### 2.修改 config.py 的内容
例如:
```
# chatGPT apikey
chatGPT_apikey = "sk-Ol0g4yBAohi51TdOmpfXHBOv1dLLcXUArFLxK"
# chatGPT接口使用的代理
PROXY = "socks5h://192.168.1.104:10801"
# 公众号设置
pub_app_id = 'wx37b939340a4'
pub_app_secret = '0ec16ab7f35d0cff34524c91'
# 公众号加密配置
pub_token = "yasdasd9" # 请填写你在微信公众平台设置的 Token
EncodingAESKey = 'b1rngzANnE69YPpc5'
# 小程序配置
we_app_id = 'wx65db5e0e17'
we_secret = '723f69bedfe7f873c'
app_token = ''
```
### 运行
```
python3 main.py
```
## Docker 运行
1.[设置](#setings)步骤完成后
2.运行
```
docker-compopse up --build -d
```
\ No newline at end of file
from fastapi import APIRouter
from api import bill, nav, fund, group
api_router = APIRouter()
api_router.include_router(bill.router, prefix="/bill", tags=["账单"])
api_router.include_router(nav.router, prefix="/nav", tags=["净值"])
api_router.include_router(fund.router, prefix="/fund", tags=["基金"])
api_router.include_router(group.router, prefix="/group", tags=["用户分组"])
from fastapi import APIRouter
router = APIRouter()
import uuid
from fastapi import APIRouter
from motor.core import AgnosticCollection
from starlette.requests import Request
from exception.db import NotFundError
from model import success_res, BaseJsonResponse
from model.fund import CreateFund, response_fund_model, ResFund
router = APIRouter()
@router.post('/', response_model=BaseJsonResponse, summary='创建基金', description='创建基金')
async def create(fund: CreateFund, request: Request):
collection: AgnosticCollection = request.app.state.mongodb_manger.get_client(name='pyfund', db='pyfund',
collect='fund')
await collection.insert_one(fund.dict())
return success_res(data=fund.dict())
@router.get('/{id}/', response_model=response_fund_model, summary='查询基金', description='查询基金')
async def get(id: str, request: Request):
collection: AgnosticCollection = request.app.state.mongodb_manger.get_client(name='pyfund', db='pyfund',
collect='fund')
data = await collection.find_one({'id': uuid.UUID(id)})
if not data:
raise NotFundError()
return success_res(data=ResFund(**data))
from fastapi import APIRouter
router = APIRouter()
from fastapi import APIRouter
router = APIRouter()
class Settings:
name = '本地环境'
import os
# 获取环境变量
env = os.getenv("MATRIXONE_ENVIRONMENT", "LOCAL")
if env == 'LOCAL':
from configs.LOCAL import Settings
elif env == 'S':
from configs.S import Settings
elif env == 'TEST':
from configs.TEST import Settings
else:
raise Exception(f'not support ENVIRONMENT {env}')
settings = Settings()
from typing import Dict
import pytz
from bson import CodecOptions
from loguru import logger
from motor.core import AgnosticCollection
from motor.motor_asyncio import AsyncIOMotorClient
class AioMongodbManager:
def __init__(self):
self.mongodb_pool: Dict[str, AsyncIOMotorClient] = {}
def setup_pool(self, mongodb_url, name: str = None):
# addr = mongodb_url.split("@")[1]
# name = name or addr
if name not in self.mongodb_pool:
logger.debug(f'新创建Mongodb连接池 [{mongodb_url}] [{name}]')
else:
logger.warning(f'Mongodb连接池 [{name}] 被覆盖创建')
self.mongodb_pool[name] = AsyncIOMotorClient(mongodb_url)
return self.mongodb_pool[name]
def get_client(self, name, db, collect) -> AgnosticCollection:
if name not in self.mongodb_pool:
raise Exception(f'not set mongodb pool {name}')
return self.mongodb_pool[name][db][collect].with_options(
codec_options=CodecOptions(tz_aware=True, tzinfo=pytz.UTC))
from typing import Dict
from loguru import logger
import aioredis
class AioRedisManager:
def __init__(self):
self.redis_pool: Dict[str, aioredis.ConnectionPool] = {}
self.maxsize = 10
async def setup_pool(self, redis_url, db, name: str = None) -> aioredis.Redis:
"""
:param redis_url: redis://[[username]:[password]]@localhost:6379/0
:param db: 库
:param name: 设置连接名称
:return:
"""
addr = redis_url.split("@")[1]
name = name or f'{addr}/{db}'
if name not in self.redis_pool:
logger.debug(f'新创建Redis连接池 [{addr}] [{name}:{db}]')
else:
logger.warning(f'Redis连接池 [{name}] 被覆盖创建')
await self.redis_pool[name].disconnect()
self.redis_pool[name] = aioredis.ConnectionPool.from_url(
# url=f'redis://{":" + quote(password) + "@" if password else ""}{address}/{db_index}',
url=f'{redis_url}/{db}',
max_connections=self.maxsize,
encoding='UTF-8',
decode_responses=True)
return aioredis.Redis(connection_pool=self.redis_pool[name])
async def get_client(self, name) -> aioredis.Redis:
if name not in self.redis_pool:
raise Exception(f'not set redis pool {name}')
return aioredis.Redis(connection_pool=self.redis_pool[name])
async def close(self):
for name, pool in self.redis_pool.items():
await pool.disconnect()
self.redis_pool = {}
if __name__ == '__main__':
redis_manager = AioRedisManager()
redis_manager.setup_pool(DefaultSettings.REDIS_URL, db=5, name='stock')
conn = redis_manager.get_client('stock')
data = conn.get('stocks')
print(data)
version: "3.0"
services:
py_fund:
build:
dockerfile: Dockerfile
context: .
image: py_fund
restart: always
container_name: py_fund
ports:
- "8000:8000"
# 数据不存在异常
from starlette import status
class NotFundError(Exception):
status_code = status.HTTP_404_NOT_FOUND
class ReqException(Exception):
pass
class RequestPubKeyError(Exception):
pass
class TokenError(Exception):
pass
import uvicorn as uvicorn
from fastapi import FastAPI
from api import api_router
from db.mongodb_helper import AioMongodbManager
app = FastAPI()
mongodb_manger = AioMongodbManager()
mongodb_manger.setup_pool('mongodb://13.115.26.128:27017', 'pyfund')
app.state.mongodb_manger = mongodb_manger
# 添加路由
app.include_router(api_router)
if __name__ == '__main__':
uvicorn.run('main:app', host='0.0.0.0', port=8000)
from typing import Any, List, Optional
from pydantic import BaseModel
from starlette import status
from starlette.responses import JSONResponse
class BaseJsonResponse(BaseModel):
data: Any
message: str = 'success'
status: int = 200
class BasePageJsonResponse(BaseJsonResponse):
page_size: Optional[int]
page_num: Optional[int]
total: Optional[int]
def dynamic_response(data_type):
class DyResponse(BaseJsonResponse):
data: Optional[data_type]
dy_response_model = type('Response' + data_type.__name__, (DyResponse,), {})
return dy_response_model
def list_dynamic_response(data_type):
class DyResponse(BaseJsonResponse):
data: Optional[List[data_type]]
dy_response_model = type('ListResponse' + data_type.__name__, (DyResponse,), {})
return dy_response_model
def page_dynamic_response(data_type):
class DyPageResponse(BasePageJsonResponse):
data: Optional[List[data_type]]
page_size: Optional[int]
page_num: Optional[int]
total: Optional[int]
dy_response_model = type('PageResponse' + data_type.__name__, (DyPageResponse,), {})
return dy_response_model
def success_res(data=None, message='success', status_code=status.HTTP_200_OK, total=None, page_num=None,
page_size=None):
res = {
"data": data,
"message": message,
"status": status_code
}
if total is not None:
res.update({
"total": total,
"page_num": page_num,
"page_size": page_size
})
return res
def error_response(message='failed', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, data=None):
res = {
"data": data,
"message": message,
"status": status_code
}
return JSONResponse(content=res, status_code=status_code)
import datetime
import uuid
from typing import List
from pydantic import BaseModel, Field
from model import dynamic_response
class Node(BaseModel):
pub_key: str
create_time: datetime.datetime
class CreateFund(BaseModel):
id: uuid.UUID = Field(default_factory=uuid.uuid1, description='基金ID')
name: str = Field(..., description='基金名称')
settlement_time: str = Field(default='08:00')
nav: float = Field(default=1)
nodes: List[Node] = Field(default=[])
create_time: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
class ResFund(CreateFund):
pass
response_fund_model = dynamic_response(ResFund)
# response_list_branch_model = list_dynamic_response(ResBranch)
uvicorn[standard]==0.20.0
fastapi==0.92.0
loguru==0.6.0
aiohttp==3.8.4
aioredis==2.0.1
motor==2.1
pytz==2022.7.1
PyJWT==2.6.0
pydantic==1.10.6
starlette==0.25.0
\ No newline at end of file
import json
import aiohttp as aiohttp
from aiohttp import ClientTimeout
from loguru import logger
from exception.http import ReqException
async def aio_request(url, method='GET', **kwargs):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
headers.update(kwargs.pop('headers', {}))
method = method.upper()
logger.info(f"[请求内容] url={url}")
try:
async with aiohttp.ClientSession(headers=headers, timeout=ClientTimeout(total=120)) as session:
async with session.request(url=url, method=method, ssl=False, **kwargs) as r:
logger.info(f"[返回状态] url={url}, status={r.status}")
json_body = await r.json()
return json_body
except Exception as e:
raise ReqException(f'请求失败 url: {url} :: {e}')
\ No newline at end of file
import json
import jwt
from fastapi import Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt import ExpiredSignatureError
from jwt.algorithms import get_default_algorithms
from loguru import logger
from configs import settings
from exception.token import RequestPubKeyError, TokenError
from tools.http_helper import aio_request
security = HTTPBearer()
async def req_pub_key(url):
"""
请求公钥
:return:
"""
# 请求key
try:
res = await aio_request(url=url)
res = res['result']
logger.info('获取key成功 {}'.format(res))
raw_key_data = res['keys'][0]
algorithms = raw_key_data['alg']
default_pub_key = get_default_algorithms()[algorithms]
public_key = default_pub_key.from_jwk(json.dumps(raw_key_data))
return algorithms, public_key
except Exception as e:
raise RequestPubKeyError(f'jwt key获取失败, url:{url} {e}')
def decode_token(token):
payload = jwt.decode(token,
key=settings.secret_key,
algorithms=[settings.algorithms],
options={'verify_iss': False, 'verify_aud': False})
return payload
def get_current_user(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
token = credentials.credentials
try:
assert credentials.scheme == 'Bearer'
payload = decode_token(token) # options={'verify_signature':False}
user_id: str = payload.get("id")
if user_id is None:
raise TokenError('错误的Token')
return payload
except ExpiredSignatureError:
raise TokenError('Token已过期')
except Exception as e:
logger.warning(e)
raise TokenError('错误的Token')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment