Драйвер Datastax Cassandra Асинхронная выборка результатов не работает - PullRequest
0 голосов
/ 22 февраля 2020

Я хочу загрузить большую строку данных, поэтому мой план состоит в том, чтобы разделить оператор на части, разделенные на метку времени, а затем выполнить его асинхронно.

...
// List to save ResultSets
List<CompletableFuture<AsyncResultSet>> pending = new ArrayList<>();

for(Range range : ranges) {
    System.out.println("Asynchronous execute query will be called soon!");
    pending.add(executeQuery(session, preparedStatement, range));
}

...

private static CompletableFuture<AsyncResultSet> executeQuery(CqlSession session, 
    PreparedStatement preparedStatement, Range range) {

return session
    .executeAsync(preparedStatement.bind()
        .setInstant("startDateTime", range.getStartDateTime().toInstant())
        .setInstant("endDateTime", range.getEndDateTime().toInstant())
        .setPageSize(1000000))
    .toCompletableFuture()
    .whenCompleteAsync((asyncResultSet, throwable) -> {
        if (throwable == null) {
            System.out.println("Range " + range.getStart() + " to " + range.getEnd() + 
                " has " + asyncResultSet.remaining() + " records.");

            fetchResultSet(asyncResultSet, throwable);

            if(asyncResultSet.hasMorePages()) {
                asyncResultSet.fetchNextPage().whenComplete(LoadCassandraAsync::fetchResultSet);
            }
        } else {
            throwable.printStackTrace();
        }
    }, Executors.newFixedThreadPool(4))
    .exceptionally(throwable -> {
        throwable.printStackTrace();
        return null;
    });
}

Я получу случайный код выхода 0 (не из основного метода), если он закрыт. Или я ничего не получу после некоторой выборки, точно так же, как работает поток, но ничего не делает.

Если я прокомментировал часть «выборки строк», я получил:

...
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Range 2020-02-14 00:00:00+0700 to 2020-02-14 01:00:00+0700 has 102974 records.
Range 2020-02-14 01:00:00+0700 to 2020-02-14 02:00:00+0700 has 98201 records.
Range 2020-02-14 06:00:00+0700 to 2020-02-14 07:00:00+0700 has 104529 records.
Range 2020-02-14 08:00:00+0700 to 2020-02-14 09:00:00+0700 has 105257 records.
...

Я думаю, что это означает, что executeQuery() метод работал хорошо.

Что я сделал неправильно?

1 Ответ

0 голосов
/ 23 февраля 2020

В зависимости от количества запросов, которые вы, возможно, исчерпываете потоки cassandra - concurrent_reads (если я правильно помню, число по умолчанию - 250). Если вы проверите журналы (/var/log/cassandra/system.log), должно появиться сообщение, связанное с проблемой. Чтобы это исправить, добавьте искусственный Thread.wait после отправки, например, 200 запросов.

...