Проблемы искровой структурированной потоковой передачи в Кубернетесе - PullRequest
0 голосов
/ 12 октября 2019

Я получаю предупреждения и ошибки при запуске структурированной потоковой передачи на кластере K8s. Вот часть моих кодов:

  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokerServers)
    .option("subscribe", topicName)
    .load()

  val query = df.writeStream
    .outputMode("append")
    .foreachBatch((batchDF: DataFrame, batchId: Long) => {

      val rstDF = batchDF.select($"value")
        .map(row => valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString)
        .transform(runner.spark.read.json)
        .transform(trimDF)

      println(s"Batch $batchId: ${rstDF.count} rows")
      rstDF.show(false)

    })
    .trigger(Trigger.ProcessingTime("120 seconds"))
    .start()

  query.awaitTermination()

Первая партия (партия 0) в порядке. Однако, когда данные поступили для пакета 1, я получил предупреждения о задаче, потерянной из-за java.lang.NullPointerException.

...
19/10/12 02:02:18 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
...
19/10/12 02:02:18 INFO DAGScheduler: ResultStage 2 (start at MergeKafkaToDelta.scala:124) failed in 17.980 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
19/10/12 02:02:18 ERROR MicroBatchExecution: Query [id = e1f15e44-ad17-452d-97cf-def26f729f38, runId = c0b7c2ba-fca4-4538-8095-cbe2daeec525] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...

Вы знаете его коренную причину? Как я могу настроить конфигурации и параметры для spark-submit? У меня есть блог, связанный с ним: Контрольная точка Spark Streaming на Kubernetes

Однако она основана на конкретной облачной платформе. :-( Знаете ли вы общие решения?

1 Ответ

0 голосов
/ 14 октября 2019

Похоже, спарк-на-k8s-оператор является решением: Spark Структурированные потоковые приложения на Kubernetes . И есть новая версия для spark 2.4.4: spark-on-k8s-operator

Так что я работаю над этим и пытаюсь разработать пример для структурированной потоковой передачи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...