Кассандра бросает соединение с драйвером Java noHostAvailableException - PullRequest
0 голосов
/ 21 сентября 2018

У меня есть кластер кассандры с двумя узлами. Я настроил задание spark для запроса из этого кластера кассандры, у которого 3651568 ключей.

import com.datastax.spark.connector.rdd.ReadConf
import org.apache.spark.sql.cassandra
import org.apache.spark.sql.SparkSession

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "hostname)
val sc = new SparkContext(conf)

val spark = SparkSession.builder().master("local").appName("Spark_Cassandra").config("spark.cassandra.connection.host", "hostname").getOrCreate()
val studentsDF = spark.read.cassandraFormat("keyspacename", "tablename").options(ReadConf.SplitSizeInMBParam.option(32)).load()
studentsDF.show(1000)

Я могу запросить первые 1000 строкно я не могу найти способ чтения из 1001th строки в 2000-ю строку, чтобы я мог читать данные из таблицы Cassandra с помощью искры.

в соответствии с рекомендацией, которую я начал использовать java драйвер

вот полное объяснение

Мне нужно запросить из базы данных Cassandra с помощью Java-драйвера datastax .. Я использую версию Java-драйвера datastax cassandra-java-driver-3.5.1 и версию Apache-Cassandra apache-cassandra-3.0.9, и я попыталсяРешая зависимости, установив jar, я также проверил, что все файлы yaml, listen_address, rpc_address указывают на мой хост, а start_native_transport установлен в true. Вот мой код java для установления соединения с базой данных cassandra. *

import java.net.InetAddress;
  import com.datastax.driver.core.Metadata;
  import java.net.UnknownHostException;
  import com.datastax.driver.core.Cluster;
  import com.datastax.driver.core.Cluster.Builder;
  import com.datastax.driver.core.Session;
  import com.datastax.driver.core.ResultSet;
  import com.datastax.driver.core.Row;
public class Started {
    public void connect()
    {
     try
       {
         Cluster cluster;
         Session session;
         cluster = Cluster.builder().addContactPoints("***.***.*.*").build();
       cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(2000);
         System.out.println("Connected to cluster:");
         session= cluster.connect("demo");
         Row row = session.execute("SELECT ename FROM demo.emp").one();
         System.out.println(row.getString("ename"));
         cluster.close();
        }
          catch (Exception e) {
              e.printStackTrace();
              }
           }
    public static void main(String[] args)
     {
       Started st = new Started();
       st.connect();
       }
          }

`

У меня есть только один узел в кластере кассандры, и он работаетнгя могу подключиться к нему также на порт 9042 ... пока все хорошо, но когда я запускаю свою программу Java, я получаю сообщение об ошибке или исключении ...

Connected to cluster:
`

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /***.***.*.*:9042 (com.datastax.driver.core.exceptions.TransportException: [/***.***.*.*:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:232)
            at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
            at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1631)
            at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1549)
            at com.datastax.driver.core.Cluster.init(Cluster.java:160)
            at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:342)
            at com.datastax.driver.core.Cluster.connect(Cluster.java:292)
            at Started.connect(Started.java:22)
            at Started.main(Started.java:34)

`

Может кто-нибудь, пожалуйста, помогите!!

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Это проблема совместимости драйверов.Первоначально я использовал cassandra-java-driver-3.5.1 и apache-cassandra-3.0.9.переключиться на cassandra-java-driver-3.0.8 и apache-cassandra-3.0.9, а также установить несколько jar-файлов: slf4j-log4j12-1.7.7.jar, log4j-1.2.17.jar, netty-all-4.0.39.Final.jar .. у меня отлично работает:)

0 голосов
/ 21 сентября 2018

Это может плохо подойти для Spark.Показывать, например, просто показывает 1000 записей, но порядок записей не гарантируется.Многократные вызовы могут привести к разным результатам.

Лучшая ставка в Spark - это, вероятно, получить результаты в качестве локального итератора, если вы хотите просмотреть их, но опять же, вероятно, это не лучший способ сделать что-то.Spark - это система для работы с данными на удаленном кластере.Это будет означать, что вы выполняете обработку в рамках API-интерфейса dataframe.

Если вы действительно просто хотите медленно пролистывать записи, вы можете использовать toLocalIterator для получения пакетов обратно на компьютер с драйвером (не рекомендуется).Но вы можете сделать что-то похожее, просто выполнив Select (*) с помощью драйвера Java.Итератор набора результатов, который вам возвращается, автоматически просматривает результаты по мере их продвижения.

Пример использования подкачки драйвера Java

https://docs.datastax.com/en/developer/java-driver/3.2/manual/paging/

ResultSet rs = session.execute("your query");
  for (Row row : rs) {
  // Process the row ...
  // By default this will only pull a new "page" of data from cassandra
  // when the previous page has been fully iterated through. See the
  // docs for more details    
}

Пример удаленной обработки данных с помощью Spark

Документы RDD для Cassandra Документы с кадрами данных для Cassandra // API RDD sparkContext.cassandraTable ("ks", "tab") .foreach (row => // processRow)

//Dataframe API - although similar foreach is available here as well
spark.read.format("org.apache.spark.sql.cassandra")
  .load()
  .select(//do some transforms)
  .write(//pickoutput of request)

Пример использования для localIterator, вероятно, наименее релевантный метод

Почему вы можете захотеть сделать это с примером

// This reads all data in large blocks to executors, those blocks are then pulled one at a time back to the Spark Driver.
sparkContext.cassandraTable("ks","tab").toLocalIterator
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...