Cassandra execute_async запрос потерять данные - PullRequest
0 голосов
/ 01 сентября 2018

Мне нужно вставить огромный объем данных с помощью драйвера Python DataStax для Cassandra. В результате я не могу использовать запрос execute (). execute_async () намного быстрее.

Но я столкнулся с проблемой потери данных при вызове execute_async (). Если я использую execute (), все в порядке. Но, если я использую execute_async () (для тех же самых запросов вставки), только около 5-7% моего запроса выполнялись правильно (и никаких ошибок не было). И в случае, если я добавляю time.sleep (0.01) после каждого из 1000 запросов на вставку (с помощью execute_async ()), это снова нормально.

Нет потери данных (случай 1):

for query in queries:
    session.execute( query )

Нет потери данных (случай 2):

counter = 0
for query in queries:
    session.execute_async( query )
    counter += 1
    if counter % 1000 == 0:
        time.sleep( 0.01 )

Потеря данных:

for query in queries:
    session.execute_async( query )

Есть ли причина, по которой это может быть?

Кластер имеет 2 узла

[cqlsh 5.0.1 | Кассандра 3.11.2 | CQL spec 3.4.4 | Собственный протокол v4]

DataStax Python драйвер версии 3.14.0

Python 3.6

1 Ответ

0 голосов
/ 02 сентября 2018

Поскольку execute_async является неблокирующим запросом, ваш код не ожидает завершения запроса, прежде чем продолжить. Причина, по которой вы, вероятно, не наблюдаете потери данных при добавлении 10 мс sleep после каждого выполнения, заключается в том, что это дает достаточно времени для обработки запросов до того, как вы прочитаете данные обратно.

Вам нужно что-то в вашем коде, которое ожидает завершения запросов перед считыванием данных, т. Е .:

futures = []
for query in queries:
    futures.push(session.execute(query))

for f in futures:
    f.result() # blocks until query is complete

Возможно, вы захотите оценить, используя execute_concurrent для отправки многих запросов и чтобы драйвер управлял уровнем параллелизма для вас.

...