NoNodeAvailableException после некоторого запроса на вставку в cassandra - PullRequest
1 голос
/ 28 апреля 2020

Я пытаюсь вставить данные в локальный кластер Cassandra, используя asyn c выполнение и версию 4 драйвера (так же, как мой экземпляр Cassandra)

Я создал экземпляр сеанса cql следующим образом:

CqlSession cqlSession = CqlSession.builder()
  .addContactEndPoint(new DefaultEndPoint(
    InetSocketAddress.createUnresolved("localhost",9042))).build();

Затем я создаю оператор в асинхронном режиме c:

return session.prepareAsync(
       "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
          .thenComposeAsync(
              (ps) -> {
                 CompletableFuture<AsyncResultSet>[] result = data.stream().map(
                     (d) -> session.executeAsync(
                          ps.bind(d.p1,d.p2,d.p3,d.p4)
                       )
                  ).toCompletableFuture()
              ).toArray(CompletableFuture[]::new);
          return CompletableFuture.allOf(result);
      }
);

data - это динамический список c, заполненный данными пользователя.

Когда я выполняю c код, я получаю следующее исключение:

Caused by: com.datastax.oss.driver.api.core.NoNodeAvailableException: No node was available to execute the query

    at com.datastax.oss.driver.api.core.AllNodesFailedException.fromErrors(AllNodesFailedException.java:53)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.sendRequest(CqlPrepareHandler.java:210)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.onThrottleReady(CqlPrepareHandler.java:167)
    at com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler.register(PassThroughRequestThrottler.java:52)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.<init>(CqlPrepareHandler.java:153)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:66)
    at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.process(CqlPrepareAsyncProcessor.java:33)
    at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:210)
    at com.datastax.oss.driver.api.core.cql.AsyncCqlSession.prepareAsync(AsyncCqlSession.java:90)

Узел активен, и некоторые данные вставляются до возникновения исключения. Я также попытался установить имя центра обработки данных в построителе сеансов без какого-либо результата.

Почему возникает это исключение, если узел запущен и работает? На самом деле у меня есть только один локальный узел, и это может быть проблемой?

Ответы [ 2 ]

1 голос
/ 06 мая 2020

Наконец, я нашел решение, использующее BatchStatement и небольшой пользовательский код для создания списка с измененными параметрами.

    int chunks = 0;
    if (data.size() % 100 == 0) {
      chunks = data.size() / 100;
    } else {
      chunks = (data.size() / 100) + 1;
    }

    final int finalChunks = chunks;

    return session.prepareAsync(
           "insert into table (p1,p2,p3, p4) values (?, ?,?, ?)")
            .thenComposeAsync(
                    (ps) -> {


                      AtomicInteger counter = new AtomicInteger();

                      final List<CompletionStage<AsyncResultSet>> batchInsert = data.stream()
                              .map(
                                      (d) -> ps.bind(d.p1,d.p2,d.p3,d.p4)

                              )
                              .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / finalChunks))
                              .values().stream().map(
                                      boundedStatements -> BatchStatement.newInstance(BatchType.LOGGED, boundedStatements.toArray(new BatchableStatement[0]))
                              ).map(
                                      session::executeAsync
                              ).collect(Collectors.toList());

                      return CompletableFutures.allSuccessful(batchInsert);
                    }
            );
1 голос
/ 28 апреля 2020

Самая большая вещь, которую я не вижу, это способ ограничить текущее количество активных асин c потоков.

В основном, если этот (сопоставленный) поток данных получает достаточно сильный удар, он Я в основном создам все эти новые темы, которые он ожидает. Если количество записей, поступающих из этих потоков, создает достаточное противодавление, которое узел не может поддерживать или догнать, узел будет перегружен и не будет принимать запросы.

Посмотрите на этот пост Райан Свихла из DataStax:

Cassandra: Пакетная загрузка без пакета - Nuanced Edition

Код взят из версии драйвера 3.x, но концепции такие же. По сути, предоставьте какой-нибудь способ уменьшить количество записей или ограничить число «потоковых потоков», запущенных в любой момент времени, и это должно сильно помочь.

...