Я пытаюсь портировать приложение Apache Flink (scala) на структурированную потоковую передачу Spark.Основная работа приложения:
- Чтение сообщений из kafka
- Выполнение некоторых преобразований / обработка
- Вывод нуля или более сообщений в kafka
Во время обработки я хочу выводить сообщения журнала (общая информация об обработке, ошибки синтаксического анализа, ...).Однако, если исходить от Flink, обработка будет выполняться одним или несколькими .map
операторами, которые работают с моими Dataset[Node]
/ Dataset[MyCaseClass]
объектами.К сожалению, внутри этих операторов все должно быть сериализуемым, что неверно для моего регистратора (используя scala-logging
).
Таким образом, при попытке использовать регистратор я получаю: org.apache.spark.SparkException: Task not serializable
.
Пример:
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", host + ":" + port)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(n =>
{
// processing here
log.warn("bla") // <-- no-go
<root></root>.asInstanceOf[Node]
})
.map(_.toString())
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", host.get + ":" + port.get)
.option("topic", topic.get)
.option("checkpointLocation", "myDir")
.start()
.awaitTermination()
Какой рекомендуемый способ выполнения таких действий, как ведение журнала, который не сериализуем?В Flink есть опция для создания подкласса RichMapFunction
и аналогичных классов, где вы можете поместить все несериализуемые вещи, и они будут созданы для каждого оператора / параллелизма.