Когда объединяет два потока в Spark Structured Streaming 2.4.0 мы периодически получаем исключение ниже в нескольких микропакетах, и задание не выполняется. Когда мы искали исключение, в нем говорится, что мы должны преобразовать все столбцы String, допускающие значение NULL, в , не допускающие значения NULL. Мы не знаем, как это сделать. Примечание: Проверки нулевого значения в столбцах соединения были выполнены перед операцией соединения.
Фрагмент кода
val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> "1" )
val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> "1" )
spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
spark.sql("select * from source1 where eventTime1 is not null and col1 is not null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
spark.sql("select * from source2 where eventTime2 is not null and col2 is not null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + interval 2 hours").createTempView("join")
val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
spark.sql("select * from join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 seconds")).format("parquet").options(optionsMap3).start()
Исключение
"20/05/07 03:04:27 ERROR executor.Executor: Exception in task 0.0 in stage 591.0 (TID 5881)
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$sp(StreamingSymmetricHashJoinExec.scala:338)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(StreamingSymmetricHashJoinExec.scala:323)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(StreamingSymmetricHashJoinExec.scala:323)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJoinExec.scala:323)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(StreamingSymmetricHashJoinExec.scala:361)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Аналогичная проблема была опубликована и в Spark JIRA. REF : Ссылка