У меня есть список моих записей. Для каждой из моих записей мне нужно несколько тяжелый расчет, поскольку я создаю обратный индекс в redis. Для достижения записи несколько команд redis выполняются в конвейере (100 с sadd
+ 1 set
).
Я хочу распараллелить эту часть создания индекса (используя joblib), но не смог этого сделать. Первая проблема заключалась в том, что я хотел передать соединение redis для каждой работы, но это не работает, так как joblib хочет его сериализовать, что не работает. Так что просто отправьте хост / порт, и каждая порция создаст свое собственное соединение.
def heavy_calc_insert(value, host, port)
r = redis.Redis(host=host, port=port)
#... some calc
pipe = r.pipeline()
for bit in bits:
key = "bit:" + str(bit)
pipe.sadd(key, idx_value)
pipe.set(idx_value, id)
pipe.execute()
Parallel(n_jobs=4)(delayed(heavy_calc_insert)(value, host, port) for value in values)
Однако с этим кодом я довольно быстро получаю ConnectionError
:
ConnectionError: Connection closed by server.
Я предполагаю, что я форма sone имеет слишком много соединений.
Как я могу решить эту проблему?