При выполнении следующего фрагмента кода я получаю исключение. Может кто-нибудь сказать мне, что не так с этим кодом?
JavaDStream<String> newDstream= newlines.window(Durations.seconds(300),Durations.seconds(120));
newDstream.print(); //This prints perfectly
newDstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> rdd) throws Exception {
//Logger.debug("RDD received: {}",rdd.collect());
Dataset<Row> df = sqlcontext.read().option("multiline", true).json(rdd);
df.printSchema();
df.show();
}
});
Ниже вывод df.printSchema
root
|-- priceData: struct (nullable = true)
| |-- close: string (nullable = true)
| |-- high: string (nullable = true)
| |-- low: string (nullable = true)
| |-- open: string (nullable = true)
| |-- volume: string (nullable = true)
|-- symbol: string (nullable = true)
|-- timestamp: string (nullable = true)
и вот исключение:
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast
to [B at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:81) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)