Commit 51af1633 authored by 陈涛's avatar 陈涛

提交查询pubkey相关接口

parent daecbead
import datetime
import re
import json
import time
from typing import List, Union
from pydantic import BaseModel, Field
from configs import settings
# from bs4 import BeautifulSoup as bs
from tools.http_helper import aio_request
class ValidatorBlock(BaseModel):
epoch: int = Field(None, title="epoch")
slot: int = Field(None, title="slot")
status: str = Field(None, title="status")
timestamp: int = Field(None, title="timestamp")
root_hash: str = Field(None, title="root_hash")
att: int = Field(None, title="att")
dep: int = Field(None, title="dep")
proposers: str= Field(None, title="proposers")
ex: int = Field(None, title="ex")
class ValidatorBlocks(BaseModel):
total: int = Field(None, title="total")
data: List[ValidatorBlock] = Field(None, title="data")
class BeaconChaService:
base_url = settings.beaconcha_url
api_key = "STVubDVzdkxxRGMvZ2E4cTc0TlZP"
start_date = datetime.datetime(year=2020, month=12, day=1)
deposit_abi = {
"inputs": [
{
"internalType": "bytes",
"name": "pubkey",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "withdrawal_credentials",
"type": "bytes"
},
{
"internalType": "bytes",
"name": "signature",
"type": "bytes"
},
{
"internalType": "bytes32",
"name": "deposit_data_root",
"type": "bytes32"
}
],
"name": "deposit",
"outputs": [],
"stateMutability": "payable",
"type": "function"
}
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
async def get_validator(self, indexOrPubkey: Union[int, str]):
"""获取质押信息"""
url = f"{self.base_url}/api/v1/validator/{indexOrPubkey}"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
return response
async def get_validator_deposit(self, indexOrPubkey: Union[int, str]):
"""获取质押信息"""
url = f"{self.base_url}/api/v1/validator/{indexOrPubkey}/deposits"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
return response
async def get_validator_income(self, indexOrPubkey: Union[int, str]):
"""获取质押收益"""
url = f"{self.base_url}/api/v1/validator/{indexOrPubkey}/performance"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
return response
async def get_validator_blocks(self, index: int, start: int = 0, length: int = 10):
t = time.time() * 1000
url = f"https://beaconcha.in/validator/{index}/proposedblocks?draw=1&columns%5B0%5D%5Bdata%5D=0&columns%5B0%5D%5Bname%5D=&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=1&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=true&columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=2&columns%5B2%5D%5Bname%5D=&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=3&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=true&columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=4&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=false&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=5&columns%5B5%5D%5Bname%5D=&columns%5B5%5D%5Bsearchable%5D=true&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=6&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=true&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=7&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B8%5D%5Bdata%5D=8&columns%5B8%5D%5Bname%5D=&columns%5B8%5D%5Bsearchable%5D=true&columns%5B8%5D%5Borderable%5D=true&columns%5B8%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B8%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B9%5D%5Bdata%5D=9&columns%5B9%5D%5Bname%5D=&columns%5B9%5D%5Bsearchable%5D=true&columns%5B9%5D%5Borderable%5D=true&columns%5B9%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B9%5D%5Bsearch%5D%5Bregex%5D=false&order%5B0%5D%5Bcolumn%5D=0&order%5B0%5D%5Bdir%5D=desc&start={start}&length={length}&search%5Bvalue%5D=&search%5Bregex%5D=false&_={t}"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
data = response.result["data"]
blocks = []
for item in data:
epoch = re.findall(r'/epoch/(\d+)">', item[0])[0]
slot = re.findall(r'/slot/(\d+)">', item[1])[0]
status = re.findall(r'>(\w+)</', item[2])[0]
timestamp = re.findall(r'data-timestamp=\"(\d+)\">', item[3])[0]
root_hash = re.findall(r'data-clipboard-text=(\w+)>', item[4])[0]
att = item[5]
dep = item[6]
proposers = item[7]
ex = item[8]
blocks.append(ValidatorBlock(
epoch=epoch,
slot=slot,
status=status,
timestamp=timestamp,
root_hash=root_hash,
att=att,
dep=dep,
proposers=proposers,
ex=ex
))
result = ValidatorBlocks(total=response.result["recordsTotal"], data=blocks)
return result
async def get_daily_stats(self, index: int):
url = f"{self.base_url}/api/v1/validator/stats/{index}"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
return response
async def get_eth_deposits(self, indexOrPubkey: Union[int, str]):
url = f"{self.base_url}/api/v1/validator/{indexOrPubkey}/deposits"
response = await aio_request(
url=url,
method="GET",
headers=self.headers
)
return response
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment