Как обрабатывать SQLAlchemy Disconnects? - PullRequest
1 голос
/ 28 апреля 2020

Я использую sqlalchemy, чтобы установить sh соединение с Presto, а затем использую dask для распараллеливания чтения запросов. Проблема в том, что код не может завершить работу sh из-за TimeoutError. Каков наилучший способ реализовать логи повторных попыток c при передаче SQLAlchemy Engine в Pandas?

Пример кода:

import dask.dataframe as dd
from dask import delayed

def parallelize_query(date):
    engine = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                                   connect_args={'protocol': 'https',
                                                 'requests_kwargs': {'verify': key}})

    q = query.format(date)
    data = pd.read_sql(q, conn)
    engine.dispose()
    return data

ddf = dd.from_delayed([delayed(parallelize_query)(d) for d in recent_dates])

Ошибка:

ConnectionError: HTTPSConnectionPool(host='<host>', port=<port>): Max retries exceeded with url: /v1/statement/executing/20200427_231652_00087_rfnzc/yb90eab8fdeee4de5656afabc011dd8724e77bdcb/13761 (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x913f95e90>: Failed to establish a new connection: [Errno 60] Operation timed out'))
...