Я обновляю проект с Python 2.7 до 3.6 и использую cassandra-driver
версию 3.14 для подключения к базе данных Cassandra и запускаю исключение при выполнении операторов вставки.
Хороший ресурс предназначен для выполнения многопроцессорных параллельных запросов, которые хорошо работали в Python 2.7 с использованием SSL-соединения с Cassandra.Однако при запуске этой реализации через соединение SSL в 3.6 (из контейнера Docker, базовый образ python 3.6-stretch) я периодически получаю сообщение об ошибке:
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 670, in get
'Unable to complete the operation against any hosts', {}
при выполнении операторов вставки.Отключение SSL (или запуск через SSL в Python 2.7) решает проблему.Мне интересно, есть ли что-то очевидное, что я здесь упускаю?
Мой объект соединения:
ssl_opts = {
'ca_certs': os.path.join(...),
'keyfile': os.path.join(...),
'certfile': os.path.join(...),
'cert_reqs': CERT_REQUIRED
}
# connection object contains host, port, username, password
auth_provider = PlainTextAuthProvider(
username=connection.username,
password=connection.password
)
# .....
# the following Cluster object is returned
# from a build_cassandra_cluster() method:
return Cluster(
contact_points=connection.host,
port=connection.port,
ssl_options=ssl_opts,
auth_provider=auth_provider,
protocol_version=4,
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())
)
Объект QueryManager
выглядит следующим образом:
class QueryManager:
session = None
prepared = None
concurrent = CASSANDRA_IN_FLIGHT_MAX
def __init__(self, stmt):
self.pool = Pool(processes=cpu_count(), initializer=self._setup, initargs=(stmt,))
@classmethod
def _setup(cls, stmt):
cluster = build_cassandra_cluster()
cls.session = cluster.connect("my_keyspace")
cls.session.row_factory = tuple_factory
cls.prepared = cls.session.prepare(stmt)
def close_pool(self):
self.pool.close()
self.pool.join()
def run_query(self, insert_values):
self.pool.map(
func=_multiproc_get,
iterable=(insert_values[n:n + self.concurrent] for n in range(0, len(insert_values), self.concurrent))
)
@classmethod
def _results_from_concurrent(cls, params):
execute_concurrent_with_args(cls.session, cls.prepared, list(params))
def _multiproc_get(params):
return CassandraQueryManager._results_from_concurrent(params)
Я звоню
qm = QueryManager(stmt)
qm.run_query(all_insert_vals) # a list of param iterables
Исключение указывает на: insert_values[n:n + self.concurrent] for n in range(0, len(insert_values), self.concurrent)
в классе QueryManager выше.