Структурированная потоковая передача Spark: исключение NullPointerException при объединении двух потоков - PullRequest
1 голос
/ 07 мая 2020

Когда объединяет два потока в Spark Structured Streaming 2.4.0 мы периодически получаем исключение ниже в нескольких микропакетах, и задание не выполняется. Когда мы искали исключение, в нем говорится, что мы должны преобразовать все столбцы String, допускающие значение NULL, в , не допускающие значения NULL. Мы не знаем, как это сделать. Примечание: Проверки нулевого значения в столбцах соединения были выполнены перед операцией соединения.

Фрагмент кода

val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", "maxFilesPerTrigger" ->  "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> "1" )
val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", "maxFilesPerTrigger" ->  "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> "1" )
spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
spark.sql("select * from source1 where eventTime1 is not null and col1 is not null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
spark.sql("select * from source2 where eventTime2 is not null and col2 is not null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
spark.sql("select * from viewNotNull1 a join  viewNotNull2 b on a.col1 = b.col2 and a.eventTime1 >= b.eventTime2 and  a.eventTime1 <= b.eventTime2 + interval 2 hours").createTempView("join")
val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
spark.sql("select * from join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 seconds")).format("parquet").options(optionsMap3).start()    

Исключение

"20/05/07 03:04:27 ERROR executor.Executor: Exception in task 0.0 in stage 591.0 (TID 5881)
java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
        at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
        at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$sp(StreamingSymmetricHashJoinExec.scala:338)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(StreamingSymmetricHashJoinExec.scala:323)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(StreamingSymmetricHashJoinExec.scala:323)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
        at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJoinExec.scala:323)
        at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(StreamingSymmetricHashJoinExec.scala:361)
        at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Аналогичная проблема была опубликована и в Spark JIRA. REF : Ссылка

...