Просто начал работать с Alpaca API (также использует службу Polygon IO) и столкнулся со следующей проблемой, которая, похоже, должна быть простой, но я застрял. Код инициализирует и правильно выводит всю ожидаемую информацию. Проблема в том, что когда я вызываю «отказаться от подписки», сообщение об ошибке не появляется, а сообщения просто продолжают приходить.
Интересно то, что я также пробовал вызывать deregister () и даже просто close () - все безрезультатно. Я также пробовал настраивать шаблоны различными способами - опять же, без ошибок, трудно понять. Но строка «Закрытие соединения для» плюс символ тикера просто продолжает приходить, приходить и приходить.
Я подозреваю, что звонки теряются в событии StreamConn / asyncio l oop где-то и просто не запускаются, но я не совсем уверен, как лучше всего решить эту проблему. Любые мысли приветствуются. Python 3.7, на случай, если это не очевидно.
Обратите внимание, что я не делаю ничего интересного с этим скриптом, кроме печати данных потока по мере их поступления, чтобы понять, почему я ' м не удалось закрыть соединение. В документации указано (и код показывает), что мы должны иметь возможность подписаться на дополнительные каналы в середине цикла, но пока это тоже не сработало. Конечное желаемое конечное состояние - это возможность подписаться на дополнительные каналы и отказаться от подписки на существующие каналы на постоянной основе. Поскольку библиотеки используют asyncio, можно было бы подумать ...
Источник StreamConn Alpaca находится здесь: https://github.com/alpacahq/alpaca-trade-api-python/blob/master/alpaca_trade_api/stream2.py Версия Alpaca расширяет исходный класс Polygon, здесь : https://github.com/alpacahq/alpaca-trade-api-python/blob/master/alpaca_trade_api/polygon/streamconn.py
from alpaca_trade_api import StreamConn
from alpaca_config import AlpacaAPIConfig
import datetime
script_start = datetime.datetime.now()
conn = StreamConn(AlpacaAPIConfig.get_public_api_key(), AlpacaAPIConfig.get_secret_key())
symbols = ['AAPL', 'MSFT']
async def handle_signal(channel, symbol, value):
if (datetime.datetime.now() - script_start).seconds > 30:
print('Closing connection for ' + str(symbol))
await conn.unsubscribe([r'^T.' + str(symbol), r'^AM.' + str(symbol)])
else:
print(str(channel) + '.' + str(symbol))
print(str(value))
print(' ')
@conn.on(r'^AM.*$', symbols) # AM. denotes minute bars, symbols is the list of tickers to listen for
async def on_bar(conn, channel, bar):
symbol = bar.symbol
close = float(bar.close)
print('-- AM Hit --')
await handle_signal(channel, symbol, close)
@conn.on(r'^T.*$', symbols) # T. denotes a trade event, symbols is the list of tickers to listen for
async def on_trade(conn, channel, trade):
symbol = str(trade.symbol)
price = str(trade.price)
print('-- T Hit --')
await handle_signal(channel, symbol, price)
def kickoff():
conn.run([
'T.*',
'AM.*'
])
if __name__ == '__main__':
kickoff()
Копия вывода, на всякий случай, это полезно:
-- T Hit --
T.AAPL
382.31
-- T Hit --
T.AAPL
382.31
-- AM Hit --
AM.AAPL
382.31
-- AM Hit --
AM.MSFT
213.32
-- T Hit --
T.MSFT
213.45
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for MSFT
-- T Hit --
Closing connection for AAPL
-- T Hit --
Closing connection for AAPL
ОБНОВЛЕНО Очевидно, .run
и .subscribe / .unsubscribe
не Не играем вместе. Я изменил функцию kickoff()
на следующее:
async def kickoff():
await conn.subscribe(['T.AAPL', 'T.MSFT', 'AM.AAPL', 'AM.MSFT'])
и функцию __main__()
на:
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.create_task(kickoff())
loop.run_forever()
Теперь, используя .subscribe()
вместо .run
, поведение тикера такое, как ожидалось (останавливается при вызове .unsubscribe()
). Однако теперь мне придется разобраться в событии l oop part ...