(Datastax 4.1.0) (Cassandra) Как собрать все ответы с помощью session.executeAsyn c? - PullRequest
0 голосов
/ 08 февраля 2020

Я хочу сделать asyn c вызов cassandra db с execute. Asyn c вызов в мануэле. Я нашел этот код, но не мог понять, как собрать все строки в любом списке. На самом деле basi c вызовите как Select * из таблицы, и я хочу сохранить все результаты.

https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();

// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));

// Apply a synchronous computation:
CompletionStage<String> resultStage =
    responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));

// Perform an action once a stage is complete:
resultStage.whenComplete(
    (version, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        System.out.printf("Server version: %s%n", version);
      }
      sessionStage.thenAccept(CqlSession::closeAsync);
    });

Ответы [ 2 ]

0 голосов
/ 10 февраля 2020

Вот пример для 4.x (вы также можете найти образец для реактивного кода, доступного от 4.4 BTW)

https://github.com/datastax/cassandra-reactive-demo/blob/master/2_async/src/main/java/com/datastax/demo/async/repository/AsyncStockRepository.java

0 голосов
/ 09 февраля 2020

Вам нужно обратиться к разделу об асинхронной подкачке - вам необходимо предоставить обратный вызов, который будет собирать данные в список, предоставленный как внешний объект. Документация имеет следующий пример:

CompletionStage<AsyncResultSet> futureRs =
    session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);

void processRows(AsyncResultSet rs, Throwable error) {
  if (error != null) {
    // The query failed, process the error
  } else {
    for (Row row : rs.currentPage()) {
      // Process the row...
    }
    if (rs.hasMorePages()) {
      rs.fetchNextPage().whenComplete(this::processRows);
    }
  }
}

в этом случае processRows может хранить данные в списке, который является частью текущего объекта, что-то вроде этого:

class Abc {
  List<Row> rows = new ArrayList<>();

  // call to executeAsync

  void processRows(AsyncResultSet rs, Throwable error) {
....
    for (Row row : rs.currentPage()) {
      rows.add(row);
    }
....

  }
}

, но вам нужно быть очень осторожным с select * from table, так как он может вернуть много результатов, плюс он может прерваться, если у вас слишком много данных - в этом случае лучше выполнить сканирование диапазона токенов (у меня есть пример для драйвера 3.x , но для 4.x пока нет).

...