Невозможно разделить Spark Streaming набор данных Kafka на два по значению столбца - PullRequest
0 голосов
/ 15 апреля 2019

Я пытаюсь разделить поток сообщений из одной темы Kafka по значению столбца, используя Spark Structured Streaming. После идентификации / разделения по значению столбца я хочу записать эти элементы в их собственную тему Kafka.

Кажется, что я могу только записать набор данных в один приемник kafka, и это последовательно первый объект, для которого я выполняю writeStream, в приведенном ниже коде он будет идентифицировать элементы по "ОШИБКЕ", но если я перееду что после того, как один идентифицирует элементы по "! ОШИБКЕ", то этот выписывает - но только один и первый объявлен.

Я считаю, что не могу повторно обработать первый DataStream, который я преобразовал в тип LogEntry, но я не знаю почему? или как это исправить? или как сделать это оптимально с точки зрения производительности и накладных расходов?

Извините, я просто изо всех сил пытаюсь найти полные примеры из реальной жизни (я видел больше кода для разделения и подсчета слов, чем мне когда-либо понадобилось бы). Я использую / изучаю последнюю версию Spark 2.4.1, и это еще одна проблема со старыми примерами.

...
// case class to read log messages into... 
case class LogEntry(dateTime:String, level:String, thread:String, location:String, msg:String)
...
// function to read the raw log message and convert to a LogEntry... 
    def parseLog(x:Row) : Option[LogEntry] = {
      val s = Option(x.getString(0)).getOrElse("")
      val matcher:Matcher = logPattern.matcher(s);
      if (matcher.matches()) {
        return Some(LogEntry(
        parseDateField(matcher.group(1)).getOrElse(""),
        matcher.group(2),
        matcher.group(3),
        matcher.group(4),
        matcher.group(5)
        ))
      } else {
        return None
      }
    }
... 



object LogTopicSplitter {

  def main(args: Array[String]) {
...

val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", consumerTopic)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "false")
      .load()
      .selectExpr("CAST(value AS STRING)")


    val ds = stream.flatMap(parseLog).select("level","dateTime","msg")

    ds.filter($"level" === "ERROR")
      .selectExpr("CAST(msg AS STRING) AS value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation",s"$checkpointDir/split/err")
      .option("topic", producerErrTopic)
      .outputMode("append")
      .start()
      .awaitTermination()

    ds.filter(not($"level" === "ERROR"))
      .selectExpr("CAST(msg AS STRING) AS value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation",s"$checkpointDir/split/out")
      .option("topic", producerOutTopic)
      .outputMode("append")
      .start()
      .awaitTermination()

... 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...