Получить максимальное, минимальное смещение от кадра данных kafka - PullRequest
0 голосов
/ 18 февраля 2019

Ниже показано, как я читаю данные из kafka.

 val inputDf = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", brokers)
 .option("subscribe", topic)
 .option("startingOffsets", """{"topic1":{"1":-1}}""")
 .load()

 val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

Как получить максимальные и минимальные смещения и временную метку с указанного выше кадра данных?Я хочу сохранить его в каком-то внешнем источнике для дальнейшего использования. Я не могу использовать функцию 'agg', так как пишу тот же кадр данных для записи (как показано ниже)

 val kafkaOutput = df.writeStream
  .outputMode("append")
  .option("path", "/warehouse/download/data1")
  .format("console")
  .option("checkpointLocation", checkpoint_loc)
  .start()
  .awaitTermination() 

1 Ответ

0 голосов
/ 19 февраля 2019

Если вы сможете обновить версию Spark до 2.4.0, вы сможете решить эту проблему.

В Spark 2.4.0 у вас есть API-интерфейс spark foreachbatch, через который вы можете записать один и тот же DataFrame в несколькораковины.

Spark.writestream.foreachbatch ((batchDF, batchId) => some_fun (batchDF)). start ()

some_fun (batchDF): {сохранить DF и выполнить агрегирование}

...