У меня есть микросервис Java, который подключается к кластеру Apache Spark и использует соединитель Datastax Spark-Cassandra для сохранения данных в кластере Apache Cassandra DB.
Я написал следующий метод для удаления данных из Cassandraтаблица для определенного диапазона дат.
Точный код показан ниже:
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(partition -> {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
});
}
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", host)
.config("spark.cassandra.connection.port", port)
.config("spark.sql.caseSensitive", false)
.master(master)
.getOrCreate();
Код выполняется нормально при запуске с использованием локального мастера зажиганияузел (опция .master("local[*]")
).
Однако при попытке выполнить тот же код при подключении к удаленному главному узлу искры возникает следующая ошибка:
Трассировка стека драйверов:] с первопричиной java.lang.ClassCastException: невозможно назначить экземпляр java.lang.invoke.SerializedLambda для поля org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2.func $ 4 типа org.apache.spark.api.java.function.ForeachPartitionFunction в экземпляре org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2 в java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues (ObjectStreamClass.java:2287) в java.io.ObjectStreamClass.setObjFieldValues (ObjectStreamClass.java:1417) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2293) в java.io.ObjectInerialStream.readSj () в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStio.87) jj.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1573Stject.ject.ject.ject.f. ObjectInputStream.java:2287) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream.javOputStream. ) в Яве.io.ObjectInputStream.defaultReadFields (ObjectInputStream.java:2287) при java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:2211) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:2069) в java.io.ObjectInputStream. readObject0 (ObjectInputStream.java:1573) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:431) в org.apache.spark.serializer.JavaDeserializationStream.readObject (JavaSerializer.scala: 75) в org.apache.spark.JavaSerializerInstance.deserialize (JavaSerializer.scala: 114) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 83) в org.apache.spark.scheduler.Task.run (Task.scala: 123) вorg.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 408) в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 414) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) [pool-18-thread-1] ИНФОРМАЦИЯ com.datastax.spark.connector.cql.CassandraConnector- Отключен от кластера Кассандра: тестовый кластер