Создавайте ежечасные снимки с помощью искровой потоковой передачи Kafka - PullRequest
0 голосов
/ 14 июля 2020

У меня есть вариант использования для создания ежечасного снимка потребляемых данных из Kafka topi c. Я использую структурированную потоковую передачу искр, чтобы получать данные из Kafka и печатать их на консоли, следуя стандартной документации. Я новичок в структурированной потоковой передаче и не знаю, как приступить к созданию ежечасных снимков. Может ли кто-нибудь помочь мне в этом?

Изменить: текущий статус

Предположим, что структура события, исходящего от кафки,

id || имя || position

, где id однозначно определяет запись. У меня может быть несколько событий для id, и я хочу рассмотреть последнее событие.

Предположим, я запустил Spark pipeline в 11:00. Я хочу объединить все события, которые происходят с 11:00 до 12:00 на основе метаданных Kafka, и создать снимок, который можно выгрузить в файл.

id || имя || pos

2  || ron  || 3.76
1  || can  || 2.68
4  || barn || 4.6

Точно так же я хочу агрегировать данные за каждый час и генерировать консолидированное представление за текущий час и выгружать его в файл.

Я только что написал basi c потребитель, который выводит все события на консоль

val df = spark.readStream.format("kafka")
  .option("localhost:9092").option("subscribe", "geo-location").option("failOnDataLoss", true)
  .load.selectExpr("CAST(value AS STRING)")
  .as(Encoders.STRING)

val query = df.writeStream
  .format("console")
  .option("truncate", "false")

Пожалуйста, дайте мне знать, если требуется дополнительная информация. Спасибо!

1 Ответ

0 голосов
/ 19 июля 2020

Вы можете использовать Trigger.ProcessingTime и установить его на 1 час. Это будет запускать ваш микропакет каждый час, который извлекает последние данные из Kafka.

Теперь, чтобы создать моментальный снимок с помощью этого микропакета, вам нужно будет выполнить дедупликацию на id и выбрать последнюю запись на основе метки времени. доступное поле в ваших данных.

df.writeStream
      .trigger(Trigger.ProcessingTime(1, TimeUnit.HOURS))
      .foreachBatch {
        (microBatch: DataFrame, batchId: Long) => {
          val snapshotDF = microBatch
            .withColumn("rnk",
              row_number().over(Window.partitionBy("id").orderBy(desc("timestampField")))
            ).filter("rnk = 1")

          // write snapshotDF to csv with timestamp appended to file name

        }
      }.start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...