Apache Spark java.lang.ClassCastException при запуске forEachPartition на удаленном главном узле - PullRequest
1 голос
/ 25 октября 2019

У меня есть микросервис Java, который подключается к кластеру Apache Spark и использует соединитель Datastax Spark-Cassandra для сохранения данных в кластере Apache Cassandra DB.

Я написал следующий метод для удаления данных из Cassandraтаблица для определенного диапазона дат.

Точный код показан ниже:

public void deleteData(String fromDate, String toDate) {


    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
            .filter(col("timestamp")
                    .substr(1, 10)
                    .between(fromDate, toDate))
            .select("nodeid");


    df.foreachPartition(partition -> {
        Session session = connector.openSession();
        while (partition.hasNext()) {
            Row row = partition.next();
            session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
        }
        session.close();
    });
}

}

    @Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .appName("SparkCassandraApp")
            .config("spark.cassandra.connection.host", host)
            .config("spark.cassandra.connection.port", port)
            .config("spark.sql.caseSensitive", false)
            .master(master)
            .getOrCreate();

Код выполняется нормально при запуске с использованием локального мастера зажиганияузел (опция .master("local[*]")).

Однако при попытке выполнить тот же код при подключении к удаленному главному узлу искры возникает следующая ошибка:

Трассировка стека драйверов:] с первопричиной java.lang.ClassCastException: невозможно назначить экземпляр java.lang.invoke.SerializedLambda для поля org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2.func $ 4 типа org.apache.spark.api.java.function.ForeachPartitionFunction в экземпляре org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2 в java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues ​​(ObjectStreamClass.java:2287) в java.io.ObjectStreamClass.setObjFieldValues ​​(ObjectStreamClass.java:1417) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2293) в java.io.ObjectInerialStream.readSj () в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStio.87) jj.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573Stject.ject.ject.ject.f. ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.javOputStream. ) в Яве.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) при java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream. readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в org.apache.spark.serializer.JavaDeserializationStream.readObject (JavaSerializer.scala: 75) в org.apache.spark.JavaSerializerInstance.deserialize (JavaSerializer.scala: 114) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 83) в org.apache.spark.scheduler.Task.run (Task.scala: 123) вorg.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 408) в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 414) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) [pool-18-thread-1] ИНФОРМАЦИЯ com.datastax.spark.connector.cql.CassandraConnector- Отключен от кластера Кассандра: тестовый кластер

1 Ответ

1 голос
/ 26 октября 2019

Моя JAVA ненадежна, но вы можете попробовать извлечь лямбду в метод?

public void deleteData(String fromDate, String toDate) {
    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
        .filter(col("timestamp")
                .substr(1, 10)
                .between(fromDate, toDate))
        .select("nodeid");


    df.foreachPartition(new ForeachPartitionFunction<Row>() {
        public void call(Iterator<Row> partition) {
            Session session = connector.openSession();
            while (partition.hasNext()) {
                Row row = partition.next();
                session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
            }
            session.close();
        }
    });
}
...