添加CMC报价获取

parent 4e2c7e47
# Python版本基金后台
## 方式一运行
### 1.配置中科大加速源 (推荐)
```shell
pip3 config set global.index-url https://pypi.mirrors.ustc.edu.cn/simple/
# pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/
```
### 2.安装依赖
```shell
pip3 install -r requirements.txt
```
### 3.启动
```shell
python3 main.py
```
## 方式二运行 Docker Compose
```shell
docker compose up --build -d
```
...@@ -6,7 +6,7 @@ from db.redis_helper import AioRedisManager ...@@ -6,7 +6,7 @@ from db.redis_helper import AioRedisManager
def register_mongodb(app): def register_mongodb(app):
mongodb_manager = AioMongodbManager() mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund') mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
mongodb_manager.setup_pool(settings.jasper_mongodb_uri, 'jasper') # mongodb_manager.setup_pool(settings.jasper_mongodb_uri, 'jasper')
app.state.mongodb_manager = mongodb_manager app.state.mongodb_manager = mongodb_manager
......
...@@ -45,7 +45,7 @@ def get_bill_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_ma ...@@ -45,7 +45,7 @@ def get_bill_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_ma
def get_hour_price_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection: def get_hour_price_collect(mongodb_manager: AioMongodbManager = Depends(get_mongodb_manager)) -> AgnosticCollection:
# return mongodb_manager.get_client(name='jasper', db='TradeCenter_c', collect='HourMarketQuotes') # todo 数据库版本问题 # return mongodb_manager.get_client(name='jasper', db='TradeCenter_c', collect='HourMarketQuotes') # todo 数据库版本问题
return mongodb_manager.get_client(name='jasper', db='Market', collect='HourMarketQuotes') return mongodb_manager.get_client(name='pyfund', db='Market', collect='HourOHLCV')
# 获取redis Client # 获取redis Client
......
...@@ -2,4 +2,6 @@ from exception import MyException ...@@ -2,4 +2,6 @@ from exception import MyException
class RequestHttpException(MyException): class RequestHttpException(MyException):
pass message = '请求失败'
status = 500
...@@ -19,9 +19,6 @@ from tools.scheduler import create_scheduler ...@@ -19,9 +19,6 @@ from tools.scheduler import create_scheduler
if settings.env != 'LOCAL': if settings.env != 'LOCAL':
openapi_prefix = '/pyfund' openapi_prefix = '/pyfund'
debug = False debug = False
elif settings.env == "TEST":
openapi_prefix = '/pyfund'
debug = False
else: else:
openapi_prefix = '' openapi_prefix = ''
debug = True debug = True
......
...@@ -12,3 +12,4 @@ cryptography==39.0.2 ...@@ -12,3 +12,4 @@ cryptography==39.0.2
pymongo~=4.3.3 pymongo~=4.3.3
httpx~=0.23.3 httpx~=0.23.3
APScheduler~=3.10.1 APScheduler~=3.10.1
aiofiles==23.1.0
\ No newline at end of file
# return datetime.datetime.fromisoformat(
# self.cmc_map[int(self.fix_coin_id)]['first_historical_data'].split('.')[
# 0]).replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
import asyncio
import datetime
import json
import aiofiles
import pytz
from loguru import logger
from configs import settings
from db import AioMongodbManager
from dependencies import get_hour_price_collect
from tools.http_helper import aio_request
class CMCPrice:
def __init__(self, mongodb_manager: AioMongodbManager = None):
self.mongodb_manager = mongodb_manager
self.cmc_map = None
self.proxies = 'http://127.0.0.1:10801'
self.quick_search_url = 'https://web-api.coinmarketcap.com/v1/cryptocurrency/map'
self.cmc_api_key = "99ddd4c0-561c-4e7a-b579-d9ded29b6ba0"
self.use_api_key = False
async def get_quick_search_map(self, from_cache=True):
if not from_cache:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36'
}
response = await aio_request(url=self.quick_search_url, proxies=self.proxies, headers=headers)
data = {item['symbol']: item for item in response['data']}
async with aiofiles.open('cmc_map.json', 'w') as f:
await f.write(json.dumps(data))
try:
async with aiofiles.open('cmc_map.json') as f:
data = await f.read()
json_data = json.loads(data)
self.cmc_map = json_data
except Exception as e:
logger.warning(e)
return self.cmc_map
async def load_cmc_map(self):
await self.get_quick_search_map()
if not self.cmc_map:
await self.get_quick_search_map(from_cache=False)
return self.cmc_map
def filter_symbol_info(self, symbol):
symbol_info = self.cmc_map.get(symbol)
assert symbol_info, Exception(f'Not match [{symbol}] in {self.quick_search_url}')
return symbol_info
def query_added_date(self, symbol):
symbol_info = self.filter_symbol_info(symbol)
try:
return datetime.datetime.fromisoformat(symbol_info['first_historical_data'].split('.')[
0]).replace(minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
except Exception as e:
logger.error(f'查询起始时间失败,使用默认时间 {symbol} {e}')
return datetime.datetime.fromisoformat('2023-01-01T00:00:00').replace(tzinfo=pytz.UTC)
async def delete_exist_data(self, symbol, filtered_data, hour_price_collect):
delete_time = set([])
for item in filtered_data:
delete_time.add(item['time'])
if delete_time:
await hour_price_collect.delete_many({'symbol': symbol, 'time': {"$in": list(delete_time)}})
logger.error(f'[删除存在数据] {symbol} [{len(delete_time)}]')
async def save_history_data(self, symbol, start_time=None):
end_time = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC, microsecond=0, second=0,
minute=0) - datetime.timedelta(hours=1)
if not self.cmc_map:
await self.load_cmc_map()
hour_price_collect = get_hour_price_collect(self.mongodb_manager)
start_time = (start_time or self.query_added_date(symbol)).replace(tzinfo=pytz.UTC, second=0, microsecond=0)
next_start = None
finish = False
while not finish:
for_save_data = await self.query_ohlcv_from_api(symbol, start_time)
# 过滤数据在开始和结束时间范围内
filtered_data = []
for item in for_save_data:
next_start = item['time']
if end_time >= item['time'] >= start_time:
filtered_data.append(item)
if next_start >= end_time:
finish = True
# for_save_data = [item for item in for_save_data if (end_time >= item['time'] >= start_time)]
if not filtered_data:
break
# 删除存在数据
await self.delete_exist_data(symbol, filtered_data, hour_price_collect)
insert_res = await hour_price_collect.insert_many(filtered_data)
inserted = len(insert_res.inserted_ids)
logger.info(f'[小时数据写入成功] [{symbol}] [{inserted}]')
start_time = next_start
logger.info(f'[小时数据完成] [{symbol}]')
async def query_ohlcv_from_api(self, symbol, start_time=None, count=5):
if not self.cmc_map:
await self.load_cmc_map()
url = f'https://web-api.coinmarketcap.com/v2/cryptocurrency/ohlcv/historical?interval=1h&time_period=hourly&count={count}&symbol={symbol}&time_start={int(start_time.timestamp())}'
res_data = await aio_request(
url=url, proxies=self.proxies,
headers={'X-CMC_PRO_API_KEY': self.cmc_api_key} if self.use_api_key else None
)
for_save_data = []
for item in res_data['data'][symbol][0]['quotes']:
ohlcv_item = item['quote']['USD']
del ohlcv_item['timestamp']
ohlcv_item.update({
'symbol': symbol,
'time': datetime.datetime.fromisoformat(item['time_open']),
'create_time': datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
})
# next_start = ohlcv_item['time']
for_save_data.append(ohlcv_item)
return for_save_data
if __name__ == '__main__':
mongodb_manager = AioMongodbManager()
mongodb_manager.setup_pool(settings.py_fund_mongodb_uri, 'pyfund')
asyncio.run(CMCPrice(mongodb_manager).save_history_data('BTC'))
...@@ -8,7 +8,7 @@ from exception.http import RequestHttpException ...@@ -8,7 +8,7 @@ from exception.http import RequestHttpException
async def aio_request(url, method='GET', json_res=True, **kwargs): async def aio_request(url, method='GET', json_res=True, **kwargs):
try: try:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient(proxies=kwargs.pop("proxies", None)) as client:
method = method.upper() method = method.upper()
response = await client.request(method=method, url=url, **kwargs) response = await client.request(method=method, url=url, **kwargs)
res = response.content res = response.content
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment