Spark Streaming: чтение JSON с Kafka и добавление event_time - PullRequest
0 голосов
/ 04 марта 2020

Я пытаюсь написать задание Stateful Spark Structured Streaming, которое читает из Kafka. Как часть требования мне нужно добавить 'event_time' в мой поток в качестве дополнительного столбца. Я пытаюсь что-то вроде этого:

val schema = spark.read.json("sample-data/test.json").schema
val myStream = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "myTopic")
      .load()
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
val withEventTime = df.selectExpr("*", "cast (value.arrivalTime as timestamp) as event_time")

Но я продолжаю получать сообщение:

не может разрешить «прибытие-время» с учетом входных столбцов: [значение]

Как мне ссылаться на все элементы в моем JSON?

1 Ответ

0 голосов
/ 04 марта 2020

Мне кажется, я смог решить эту проблему следующим образом:

val withEventTime = df.withColumn("event_time",to_timestamp(col("value. arrivalTime")))

Не уверен, почему это сработало, а не другое.

...