Ведение журнала в потоковой структуре с искрой / SparkException: задача не сериализуется - PullRequest
0 голосов
/ 26 февраля 2019

Я пытаюсь портировать приложение Apache Flink (scala) на структурированную потоковую передачу Spark.Основная работа приложения:

  • Чтение сообщений из kafka
  • Выполнение некоторых преобразований / обработка
  • Вывод нуля или более сообщений в kafka

Во время обработки я хочу выводить сообщения журнала (общая информация об обработке, ошибки синтаксического анализа, ...).Однако, если исходить от Flink, обработка будет выполняться одним или несколькими .map операторами, которые работают с моими Dataset[Node] / Dataset[MyCaseClass] объектами.К сожалению, внутри этих операторов все должно быть сериализуемым, что неверно для моего регистратора (используя scala-logging).

Таким образом, при попытке использовать регистратор я получаю: org.apache.spark.SparkException: Task not serializable.

Пример:

    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", host + ":" + port)
      .option("subscribe", topic)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
      .map(n =>
      {
        // processing here

        log.warn("bla")      // <-- no-go

        <root></root>.asInstanceOf[Node]
      })
      .map(_.toString())
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", host.get + ":" + port.get)
      .option("topic", topic.get)
      .option("checkpointLocation", "myDir")
      .start()
      .awaitTermination()

Какой рекомендуемый способ выполнения таких действий, как ведение журнала, который не сериализуем?В Flink есть опция для создания подкласса RichMapFunction и аналогичных классов, где вы можете поместить все несериализуемые вещи, и они будут созданы для каждого оператора / параллелизма.

1 Ответ

0 голосов
/ 26 февраля 2019

Если вы хотите ввести несериализуемые объекты в оператор карты spark, например, соединение с базой данных, вы всегда можете воспользоваться функцией mapPartition.

mapPartition(iter => {
    val log = LoggerFactory.getLogger
    iter.map(row => {
        ....
    })
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...