redis: пакетная вставка с использованием joblib - PullRequest
1 голос
/ 22 января 2020

У меня есть список моих записей. Для каждой из моих записей мне нужно несколько тяжелый расчет, поскольку я создаю обратный индекс в redis. Для достижения записи несколько команд redis выполняются в конвейере (100 с sadd + 1 set).

Я хочу распараллелить эту часть создания индекса (используя joblib), но не смог этого сделать. Первая проблема заключалась в том, что я хотел передать соединение redis для каждой работы, но это не работает, так как joblib хочет его сериализовать, что не работает. Так что просто отправьте хост / порт, и каждая порция создаст свое собственное соединение.

def heavy_calc_insert(value, host, port)

    r = redis.Redis(host=host, port=port)

    #... some calc

    pipe = r.pipeline()

    for bit in bits:
        key = "bit:" + str(bit)
        pipe.sadd(key, idx_value)

    pipe.set(idx_value, id)
    pipe.execute()

Parallel(n_jobs=4)(delayed(heavy_calc_insert)(value, host, port) for value in values)

Однако с этим кодом я довольно быстро получаю ConnectionError:

ConnectionError: Connection closed by server.

Я предполагаю, что я форма sone имеет слишком много соединений.

Как я могу решить эту проблему?

1 Ответ

0 голосов
/ 22 января 2020

Использовать пул соединений. https://github.com/andymccurdy/redis-py/blob/master/README.rst#connection -pools

Создайте пул из heavy_calc_insert и передайте пул вместо хоста и порта.

pool = redis.ConnectionPool(host=host, port=port, db=0)
...
def heavy_calc_insert(value, pool)
    r = redis.Redis(connection_pool=pool)
    ...

Redis по умолчанию имеет 10000 подключений ограничить, и он не закроет неиспользуемые соединения.

Пул соединений будет держать соединения, созданные под контролем, и даст вам лучшую производительность.

...