Cassandra многопроцессная вставка Python не удается при использовании SSL - PullRequest
0 голосов
/ 19 декабря 2018

Я обновляю проект с Python 2.7 до 3.6 и использую cassandra-driver версию 3.14 для подключения к базе данных Cassandra и запускаю исключение при выполнении операторов вставки.

Хороший ресурс предназначен для выполнения многопроцессорных параллельных запросов, которые хорошо работали в Python 2.7 с использованием SSL-соединения с Cassandra.Однако при запуске этой реализации через соединение SSL в 3.6 (из контейнера Docker, базовый образ python 3.6-stretch) я периодически получаю сообщение об ошибке:

File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 670, in get
'Unable to complete the operation against any hosts', {}

при выполнении операторов вставки.Отключение SSL (или запуск через SSL в Python 2.7) решает проблему.Мне интересно, есть ли что-то очевидное, что я здесь упускаю?

Мой объект соединения:

    ssl_opts = {
        'ca_certs': os.path.join(...),
        'keyfile': os.path.join(...),
        'certfile': os.path.join(...),
        'cert_reqs': CERT_REQUIRED
    }

    #  connection object contains host, port, username, password

    auth_provider = PlainTextAuthProvider(
        username=connection.username,
        password=connection.password
    )

    # .....
    # the following Cluster object is returned
    # from a build_cassandra_cluster() method:
    return Cluster(
        contact_points=connection.host,
        port=connection.port,
        ssl_options=ssl_opts,
        auth_provider=auth_provider,
        protocol_version=4,
        load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())
    )

Объект QueryManager выглядит следующим образом:

class QueryManager:
    session = None
    prepared = None
    concurrent = CASSANDRA_IN_FLIGHT_MAX

    def __init__(self, stmt):
        self.pool = Pool(processes=cpu_count(), initializer=self._setup, initargs=(stmt,))

    @classmethod
    def _setup(cls, stmt):
        cluster = build_cassandra_cluster()
        cls.session = cluster.connect("my_keyspace")
        cls.session.row_factory = tuple_factory
        cls.prepared = cls.session.prepare(stmt)

    def close_pool(self):
        self.pool.close()
        self.pool.join()

    def run_query(self, insert_values):
        self.pool.map(
            func=_multiproc_get,
            iterable=(insert_values[n:n + self.concurrent] for n in range(0, len(insert_values), self.concurrent))
        )

    @classmethod
    def _results_from_concurrent(cls, params):
        execute_concurrent_with_args(cls.session, cls.prepared, list(params))


def _multiproc_get(params):
    return CassandraQueryManager._results_from_concurrent(params)

Я звоню

qm = QueryManager(stmt)
qm.run_query(all_insert_vals)  # a list of param iterables

Исключение указывает на: insert_values[n:n + self.concurrent] for n in range(0, len(insert_values), self.concurrent) в классе QueryManager выше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...