Мне нужно создать токен сеанса и использовать этот токен в запросе для получения данных из API обмена ставками в спичечной коробке. Общая задача, представленная ниже, относится к задаче celerybeat, которая выполняется каждые две секунды и получает цены на события. Это функция, которую я имею ниже, чтобы получить оценку. Проблема заключается в том, что если я буду входить в систему каждый раз с помощью mb.login()
перед запуском задачи, у меня будет заблокирован ip из-за слишком большого количества входов в систему, мне нужно создать токен сеанса и вызвать его как mb.get_session_token.get_events
Я использую готовую оболочку, которая находится на github, для взаимодействия с API Matchbook. https://github.com/rozzac90/matchbook
Нет документации, но есть некоторая информация в строках документации, которую я не могу понять, но я думаю, что есть функция для создания токенов сессии, но я не уверен, как их использовать.
Я хотел бы знать, как я могу создать функцию, которая возвращает токен сеанса и передает его моей функции ниже.
Моя функция
from matchbook.apiclient import APIClient
from matchbook.enums import Side, MarketStates, Boolean, Status
@shared_task()
def mb_get_events():
mb = APIClient('username' , 'password')
events = mb.market_data.get_events(sport_ids=[9],states=MarketStates.All,
per_page=200, offset=0,
include_event_participants=Boolean.T,
category_ids=None, price_depth=3, side=Side.All, session=None)
print(events)
Baseclient.py
class BaseClient(object):
def __init__(self, username, password=None, locale=None):
"""
:param username: Matchbook username.
:param password: Password for supplied username, if None will look in for MATCHBOOK_PW in env variables.
"""
self.username = username
self.password = password
self.locale = locale
self.url = 'https://www.matchbook.com'
self.url_beta = 'https://beta.matchbook.com'
self.urn_main = '/bpapi/rest/'
self.urn_edge = '/edge/rest/'
self.session = requests.Session()
self.session_token = None
self.user_id = None
self.exchange_type = ExchangeType.BackLay
self.odds_type = OddsType.Decimal
self.currency = Currency.EUR
self.get_password()
def set_session_token(self, session_token, user_id):
"""Sets session token.
:param session_token: Session token from request.
:param user_id: User Id from the request.
"""
self.session_token = session_token
self.user_id = user_id
def get_password(self):
"""If password is not provided will look in environment
variables for username+'password'
"""
if self.password is None:
if os.environ.get('MATCHBOOK_PW'):
self.password = os.environ.get('MATCHBOOK_PW')
else:
raise PasswordError()
@property
def headers(self):
"""Set headers to be used in API requests."""
return {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
import json
from matchbook.endpoints.baseendpoint import BaseEndpoint
from matchbook.exceptions import AuthError
Keepalive.py
class KeepAlive(BaseEndpoint):
def __call__(self, session=None):
session = session or self.client.session
response = self.request('GET', self.client.urn_main, 'security/session', data=self.data, session=session)
if response.status_code == 200:
pass
elif response.status_code == 401:
response = self.request("POST", self.client.urn_main, 'security/session', data=self.data, session=session)
if response.status_code == 200:
response_json = response.json()
self.client.set_session_token(response_json.get('session-token'), response_json.get('user-id'))
else:
raise AuthError(response)
else:
raise AuthError(response)
def request(self, request_method, urn, method, params={}, data={}, target=None, session=None):
"""
:param request_method: type of request to be sent.
:param urn: matchbook urn to append to url specified.
:param method: Matchbook method to be used.
:param params: Params to be used in request
:param url: define different URL to use.
:param data: data to be sent in request body.
:param target: target key to get from data, if None returns full response.
:param session: Requests session to be used, reduces latency.
"""
session = session or self.client.session
data['session-token'] = self.client.session_token
data['user-id'] = self.client.user_id
request_url = '%s%s%s' % (self.client.url, urn, method)
response = session.request(
request_method, request_url, params=params, data=json.dumps(data), headers=self.client.headers
)
return response
@property
def data(self):
return {'username': self.client.username, 'password': self.client.password}