NoHostAvailableException - Искра-Кассандра-Соединитель - PullRequest
0 голосов
/ 07 июня 2018

Я использую spark-cassandra-connector_2.11 в версии 2.3.0.Запуск Latest Spark 2.3.0 Попытка чтения данных из Cassandra (3.0.11.1485) DSE (5.0.5).

Пример чтения, который работает без проблем:

 JavaRDD<Customer> result = javaFunctions(sc).cassandraTable(MyKeyspaceName, "customers", mapRowTo(Customer.class));

Еще одно чтение, котороеработает правильно: если я делаю из модульного теста - один поток - одно чтение следующим образом.

cassandraConnector.withSessionDo(new AbstractFunction1<Session, Void>() {
                @Override
                public Void apply(Session session) {
                   //Read something from Cassandra via Session - Works Fine Here as well.
                }
            });

Пример чтения (mapPartitions + withSessionDo) Проблемный код:

CassandraConnector cassandraConnector = CassandraConnector.apply(sc.getConf());

SomeSparkRDD.mapPartitions((FlatMapFunction<Iterator<Customer>, CustomerEx>) customerIterator ->
            cassandraConnector.withSessionDo(new AbstractFunction1<Session, Iterator<CustomerEx>>() {
                @Override
                public Iterator<CustomerEx> apply(Session session) {
                    return asStream(customerIterator, false)
                            .map(customer -> fetchDataViaSession(customer, session))
                            .filter(x -> x != null)
                            .iterator();
                }
            }));


public static <T> Stream<T> asStream(Iterator<T> sourceIterator, boolean parallel) {
    Iterable<T> iterable = () -> sourceIterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}

Некоторые итерацииof: map (customer -> fetchDataViaSession (customer, session)) работает, но большинство не работает с NoHostAvailableException.

Попробовал различные настройки без успеха:

spark.cassandra.connection.connections_per_executor_max
spark.cassandra.connection.keep_alive_ms
spark.cassandra.input.fetch.size_in_rows
spark.cassandra.input.split.size_in_mb

Also Tried to reduce the number of Partitions of the RDD which I do mapPartitions+withSessionDo on.

Ответы [ 2 ]

0 голосов
/ 08 июня 2018

Похоже, это решило это:

.set("spark.cassandra.connection.keep_alive_ms", "1200000")
0 голосов
/ 08 июня 2018

Проверьте, включен ли ваш кластер Cassandra по SSL.Если это так, я видел ту же ошибку, если вы не настроили правильный сертификат.

...