Execute_concurrent Кассандры не работает должным образом - PullRequest
0 голосов
/ 17 января 2019

Я написал скрипт на Python, который читает csv и вставляет их в таблицу Cassandra, используя как асинхронный, так и параллельный режим, но параллельный медленнее асинхронного. Моя цель - использовать параллельные записи - добиться параллельной записи и, следовательно, ускорить задачу индексации файла csv в cassandra.

Код с использованием асинхронного:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                futures.append(
                    self.session.execute_async(
                        insert_sql, [key, "version_1", row]
                    )
                )
                # batch.add(insert_sql, (key, "version_1", row))
            # self.session.execute(batch)
            for future in futures:
                self.log.debug(future)
                continue

Код с использованием одновременных:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                params = (key, row, )
                futures.append(
                    (
                        insert_sql,
                        params
                    )
                )
            results = execute_concurrent(
                self.session, futures, raise_on_first_error=False)
            for (success, result) in results:
                if not success:
                    self.handle_error(result)  # result will be an Exception

1 Ответ

0 голосов
/ 17 января 2019

Вы не устанавливаете параметр concurrency для execute_concurrent, и по умолчанию он использует 100.

Из документации :

Параметр одновременность контролирует, сколько операторов будет выполняться одновременно. Когда для Cluster.protocol_version установлено значение 1 или 2 , рекомендуется, чтобы это значение было ниже 100-кратного числа подключений к ядру на хост, умноженного на количество подключенных хостов (см. Cluster.set_core_connections_per_host()). Если это количество превышено, поток цикла событий может попытаться заблокировать создание нового соединения, что существенно повлияет на пропускную способность. Если protocol_version 3 или выше, вы можете безопасно экспериментировать с более высокими уровнями параллелизма.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...