У меня возникли некоторые проблемы с пакетом ib_insync
. Я пытаюсь извлечь некоторые данные и получить ConnectionError: Not connected
. Я пытался найти решение на нескольких форумах (также здесь), но безуспешно.
Вот код, который я использую:
import ...
class Historical():
def __init__(self):
...
self._ib = IB()
self._connection = self._ib.connectAsync(host=self._host, port=self._port, clientId=self._client_id, readonly=True)
async def __aenter__(self) -> Historical:
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self._connection.close()
async def get_historical_ticker_data(self) -> None:
if not self._ib.isConnected():
pass
else:
amd = Stock('AMD')
cds = self._ib.reqContractDetails(amd)
contracts = [cd.contract for cd in cds]
dataframe = util.df(contracts).to_string()
if not dataframe is None:
print(dataframe)
Вот основное, как показано ниже:
import ...
util.patchAsyncio()
async def main() -> None:
async with AsyncExitStack() as stack:
historical = await stack.enter_async_context(Historical())
await historical.get_historical_ticker_data()
asyncio.run(main())
Как мне исправить эту ошибку?