Spark Структурированная потоковая передача groupByKey на временном окне не работает - PullRequest
0 голосов
/ 05 марта 2019

Мне нужно пакетировать мой поток Kafka во временные окна по 10 минут каждое, а затем запустить некоторую пакетную обработку для него.

Примечание: записи ниже имеют поле метки времени

   val records = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerPool)
      .option("subscribe", topic)
      .option("startingOffsets", kafkaOffset)
      .load()

Iдобавить временное окно к каждой записи, используя,

.withColumn("window", window($"timing", windowDuration))

Я создал несколько вспомогательных классов, таких как

  case class TimingWindow(
      start: java.sql.Timestamp,
      end: java.sql.Timestamp
  )
  case class RecordWithWindow(
      record: MyRecord,
      groupingWindow: TimingWindow
  )

Теперь у меня есть DF типа [RecordWithWindow]

Всеэто работает очень хорошо.

Далее,

metricsWithWindow
  .groupByKey(_.groupingWindow)
  //By grouping, I get several records per time window
  //resulting an object of the below type which I write out to HDFS

  case class WindowWithRecords(
      records: Seq[MyRecord],
      window: TimingWindow
  )

Где я исследую HDFS,

Пример :

Ожидаемый : каждый объект WindowWithRecords имеет уникальный TimingWindow

WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB, MyRecordC))

Actual : более одного объекта WindowWithRecords с одним и тем же TimingWindow

WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB))
WindowWithRecordsB(TimingWindowA, Seq(MyRecordC))

выглядит какЛогика groupByKey не работает.

Надеюсь, мой вопрос понятен.Любые указатели будут полезны.

1 Ответ

0 голосов
/ 12 марта 2019

Обнаружена проблема:

Я не использовал явный триггер при обработке окна.В результате Spark создавал микропартии как можно скорее, а не в конце окна.

streamingQuery
.writeStream
.trigger(Trigger.ProcessingTime(windowDuration))
...
.start

Это было результатом моего неправильного понимания документации Spark.

Примечание: groupByKey использует хэш-код объекта.Важно убедиться, что хеш-код объекта согласован.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...