Я работаю со Spark 2.3.1, Kafka 1.1 и python 2.7.9, и я не могу его обновить.
Я обнаружил проблему при попытке использовать SparkStreaming для получения sh (или извлечения) данных из (в) Кафки. Когда у меня нет данных в Кафке-очереди (потому что она заканчивается или потому что она никогда не приходит), Spark ломается после 4 попыток
2020-02-10 12:52:51 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, 172.21.0.4, executor 1): java.util.concurrent.TimeoutException: Cannot fetch record for offset 4562638 in 120000 milliseconds
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
... x4
2020-02-10 12:58:52 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 4 times; aborting job
2020-02-10 12:58:52 ERROR WriteToDataSourceV2Exec:70 - Data source writer org.apache.spark.sql.execution.streaming.sources.InternalRowMicroBatchWriter@306b28ba is aborting.
2020-02-10 12:58:52 ERROR WriteToDataSourceV2Exec:70 - Data source writer org.apache.spark.sql.execution.streaming.sources.InternalRowMicroBatchWriter@306b28ba aborted.
2020-02-10 12:58:52 ERROR MicroBatchExecution:91 - Query [id = 45351c37-ea34-414e-8712-52ab2adeb4d3, runId = b9ad9075-0b1e-419d-ac57-212d3d7fed71] terminated with error
org.apache.spark.SparkException: Writing job aborted.
нашел возможное решение здесь , но я не знаю, как я могу добавить это решение из оболочки PySpark