NoHostAvailableException (хост не был опробован) с использованием Spark Cassandra Connector - PullRequest
0 голосов
/ 26 июня 2018

У меня проблема с соединителем DataStax Spark для Cassandra. Мое приложение содержит операцию Spark, которая выполняет ряд запросов на одну запись в базе данных Cassandra; некоторые из этих запросов будут успешными, но в какой-то момент один из запросов завершится с ошибкой NoHostAvailableException с сообщением All host(s) tried for query failed (no host was tried).

трассировка стека

2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy15.execute(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy16.execute(Unknown Source)
    at [line that contains the session.execute() call]
    [...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
    at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    ... 32 more

В попытке проанализировать эту проблему мне удалось воспроизвести ее в простой среде:

  • Одна машина с Кассандрой, мастером Spark и работником искры
  • Простая таблица, содержащая только 100 записей (10 разделов по 10 записей в каждом)

Ниже приведен минимальный код, с помощью которого я могу воспроизвести проблему.

код

val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]

val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
    connector.withSessionDo { session =>
        val clusteringKeyValues = Seq(...)

        val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")

        iterator.flatMap { pkColumn2Value =>
            val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
                preparedStatement.bind(
                    pkColumn1Value.asInstanceOf[AnyRef]
                    , pkColumn2Value.asInstanceOf[AnyRef]
                    , clusteringKeyValue.asInstanceOf[AnyRef]
                )
            )

            boundStatements.map { boundStatement =>
                val record = try {
                    session.execute(boundStatement).one()
                } catch {
                    case noHostAvailableException: NoHostAvailableException =>
                        log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
                        throw noHostAvailableException
                    case exception =>
                        throw exception
                }

                log.error(s"Retrieved record $record")
                // Sleep to simulate an operation being performed on the value.
                Thread.sleep(100)

                record
            }
        }
    }
}

log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")

Некоторые интересные вещи, которые я заметил

  • Я использую Dataset#mapPartitions(), чтобы подготовить оператор выбора только один раз для каждого раздела. Проблема исчезает, когда я проглатываю свою гордость и вместо этого использую Dataset#map() или Dataset#flatMap(), но я бы хотел использовать Dataset#mapPartitions() для (якобы) повышения производительности при подготовке запроса только один раз на раздел набора данных.
  • Кажется, что NoHostAvailableException происходит фиксированное количество времени после выполнения первого запроса. Некоторые исследования подтвердили, что это количество времени равно значению свойства соединителя spark.cassandra.connection.keep_alive_ms. Установка этого свойства на смехотворно высокое значение якобы решит проблему, но это похоже на грязный обходной путь вместо разумного решения.

В этой проблеме GitHub для коннектора комментатор pkolaczk упоминает потенциальную проблему, которая может привести к тому, что коннектору удастся установить свое первоначальное соединение с Cassandra и произойдет сбой при попытке впоследствии установить дополнительные соединения. Это звучит многообещающе, поскольку соответствует приведенным выше пунктам (которые предполагают, что проблема возникнет только после закрытия исходных соединений, что никогда не произойдет, если соединение будет восстановлено для каждого элемента в наборе данных по отдельности); однако мне не удалось найти никаких признаков того, что я неправильно сконфигурировал IP-адрес или любую другую вероятную причину этого явления (или даже подтверждение того, что это явление на самом деле является причиной проблемы).

Некоторые вещи, которые я проверял и / или пробовал

  • Многочисленные онлайн-источники предполагают, что NoHostAvailableException s всегда предшествуют другие ошибки. Я проверял свои журналы несколько раз, но не могу найти никаких других сообщений об ошибках или следов стека.
  • В ответе на другой вопрос StackOverflow предлагается вызвать NoHostAvailableException#getErrors, чтобы получить более подробное объяснение проблемы, но этот метод всегда возвращает пустую карту для меня.
  • Проблема остается, когда я использую СДР вместо наборов данных (включая тот факт, что это происходит только при использовании mapPartitions, а не при использовании map).
  • Свойство коннектора spark.cassandra.connection.local_dc изначально не было установлено. Установка для этого свойства соответствующего имени центра обработки данных не оказала заметного влияния на проблему.
  • Я попытался установить свойства соединителя spark.cassandra.connection.timeout_ms и spark.cassandra.read.timeout_ms до смехотворно высоких значений; это не оказало заметного влияния на проблему.

Некоторые номера версий

  • Spark : Воспроизведена проблема с 2.1.1 и 2.3.0
  • Кассандра : 3.11
  • Разъем : Воспроизведена проблема с 2.0.3 и 2.3.0
  • Scala : 2.11

Будем весьма благодарны за любые признаки того, что является причиной этих ошибок, или за идею о том, как решить проблему.

1 Ответ

0 голосов
/ 25 апреля 2019

Я отправил этот вопрос в группу пользователей Google соединителя (https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ),), где один из его участников подтвердил, что нет причин не указывать высокое значение для spark.cassandra.connection.keep_alive_ms. Я столкнулся до этого значения до такой степени, что я мог быть достаточно уверен, что никакие операции не пройдут, и с тех пор у меня не было проблем.

...