Как включить значение метки времени Кафки как столбцы в потоковой передаче с искрой? - PullRequest
0 голосов
/ 22 января 2019

Я ищу решение для добавления значения временной метки kafka в мою схему структурированной потоковой передачи Spark.Я извлек поле значения из кафки и создания dataframe.Моя проблема в том, что мне нужно получить поле метки времени (от kafka) вместе с другими столбцами.

Вот мой текущий код:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

Как я могу получить метку времени иза кафки добавить как столбцы вместе с другими столбцами?

Ответы [ 3 ]

0 голосов
/ 22 января 2019

На официальной веб-странице Apache Spark вы можете найти руководство: Структурированное потоковое вещание + Руководство по интеграции Kafka (версия брокера Kafka 0.10.0 или выше)

Там вы можете найти информацию о схеме DataFrame, загруженной из Kafka.

Каждая строка из источника Кафки имеет следующие столбцы:

  • ключ - ключ сообщения
  • значение - значение сообщения
  • topic - название темы сообщения
  • раздел - разделы, с которых пришло это сообщение
  • смещение - смещение сообщения
  • отметка времени - отметка времени
  • timestampType тип отметки времени

Все вышеперечисленные столбцы доступны для запроса. В вашем примере вы используете только value, поэтому для получения метки времени нужно просто добавить timestamp в оператор select:

  val allFields = kafkaDatademostr.selectExpr(
    s"CAST(value AS STRING) AS csv",
    s"CAST(key AS STRING) AS key",
    s"topic as topic",
    s"partition as partition",
    s"offset as offset",
    s"timestamp as timestamp",
    s"timestampType as timestampType"
  )
0 голосов
/ 06 апреля 2019

В моем случае с Кафкой я получал значения в формате JSON.Который содержит фактические данные вместе с исходным временем события, а не меткой времени Кафки.Ниже приведена схема.

val mySchema = StructType(Array(
      StructField("time", LongType),
      StructField("close", DoubleType)
    ))

Чтобы использовать функцию водяные знаки структурированной потоковой передачи Spark, мне пришлось преобразовать поле time в формат отметки времени.

val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
      .select(from_json($"value", mySchema).as("data"))
      .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))

Теперь вы можете использовать поле времени для оконной операции , а также водяных знаков предназначения.

import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
                      .groupBy(
                          window(col("time"), "1 minute", "30 seconds"),
                          $"close"
                      ).count()

Надеюсь, этот ответ прояснится.

0 голосов
/ 22 января 2019

Метка времени включена в исходную схему. Просто добавьте «метку времени», чтобы получить метку времени, как показано ниже.

val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
...