Я пытаюсь разделить поток сообщений из одной темы Kafka по значению столбца, используя Spark Structured Streaming. После идентификации / разделения по значению столбца я хочу записать эти элементы в их собственную тему Kafka.
Кажется, что я могу только записать набор данных в один приемник kafka, и это последовательно первый объект, для которого я выполняю writeStream, в приведенном ниже коде он будет идентифицировать элементы по "ОШИБКЕ", но если я перееду что после того, как один идентифицирует элементы по "! ОШИБКЕ", то этот выписывает - но только один и первый объявлен.
Я считаю, что не могу повторно обработать первый DataStream, который я преобразовал в тип LogEntry, но я не знаю почему? или как это исправить? или как сделать это оптимально с точки зрения производительности и накладных расходов?
Извините, я просто изо всех сил пытаюсь найти полные примеры из реальной жизни (я видел больше кода для разделения и подсчета слов, чем мне когда-либо понадобилось бы). Я использую / изучаю последнюю версию Spark 2.4.1, и это еще одна проблема со старыми примерами.
...
// case class to read log messages into...
case class LogEntry(dateTime:String, level:String, thread:String, location:String, msg:String)
...
// function to read the raw log message and convert to a LogEntry...
def parseLog(x:Row) : Option[LogEntry] = {
val s = Option(x.getString(0)).getOrElse("")
val matcher:Matcher = logPattern.matcher(s);
if (matcher.matches()) {
return Some(LogEntry(
parseDateField(matcher.group(1)).getOrElse(""),
matcher.group(2),
matcher.group(3),
matcher.group(4),
matcher.group(5)
))
} else {
return None
}
}
...
object LogTopicSplitter {
def main(args: Array[String]) {
...
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", consumerTopic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
val ds = stream.flatMap(parseLog).select("level","dateTime","msg")
ds.filter($"level" === "ERROR")
.selectExpr("CAST(msg AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("checkpointLocation",s"$checkpointDir/split/err")
.option("topic", producerErrTopic)
.outputMode("append")
.start()
.awaitTermination()
ds.filter(not($"level" === "ERROR"))
.selectExpr("CAST(msg AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("checkpointLocation",s"$checkpointDir/split/out")
.option("topic", producerOutTopic)
.outputMode("append")
.start()
.awaitTermination()
...