Не удается преобразовать данные Kafka Json в потоковую структурированную потоковую обработку - PullRequest
1 голос
/ 26 марта 2019

Я пытаюсь получить сообщения Кафки и обрабатывать их с помощью Spark в автономном режиме. Кафка хранит данные в формате JSON. Я могу получать сообщения Kafka, но не могу проанализировать данные json с помощью определения схемы.

Когда я запускаю команду bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning для просмотра сообщений kafka в теме kafka, она выдает следующее:

"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"

И я могу успешно получить эти данные с помощью этого блока кода в Spark:

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))

Схема такая:

df.printSchema()

root
 |-- value: string (nullable = true)

А затем записывает этот кадр данных в консоль, и он печатает сообщения kafka:

Batch: 9
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+

Но я хочу проанализировать данные json, чтобы определить схему и блок кода, который я пытался сделать:

schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])

parsed = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "my_kafka_topic") \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("opc"))

И схема parsed кадра данных:

parsed.printSchema()
root
  |-- opc: struct (nullable = true)
  |    |-- timestamp: string (nullable = true)
  |    |-- values: struct (nullable = true)
  |    |    |-- id: string (nullable = true)
  |    |    |-- v: integer (nullable = true)
  |    |    |-- q: boolean (nullable = true)
  |    |    |-- t: string (nullable = true)

Эти кодовые блоки работают без ошибок. Но когда я хочу записать parsed dataframe на консоль:

query = parsed\
   .writeStream\
   .format("console")\
   .start()

query.awaitTermination()

это пишет null как это в консоли:

+----+
| opc|
+----+
|null|
+----+

Итак, похоже, что есть проблема с анализом данных json, но я не могу понять это.

Можете ли вы сказать мне, что не так?

1 Ответ

0 голосов
/ 28 марта 2019

Кажется, что схема была неправильной для вашего случая, попробуйте применить следующее:

schema = StructType([ 
StructField("timestamp", LongType(), False), 
StructField("values", ArrayType(
    StructType([StructField("id", StringType(), True), 
    StructField("v", IntegerType(), False), 
    StructField("q", BooleanType(), False), 
    StructField("t", LongType(), False)]), True), True)])

Также помните, что опция inferSchema работает очень хорошо, так что вы можете позволить Spark обнаружить схемуи сохраните его.

Другая проблема состоит в том, что ваши данные json имеют начальные и конечные двойные кавычки ", а также содержат \ те, которые делают недопустимый JSON, который не позволял Spark анализировать сообщение.

Чтобы удалить недопустимые символы, ваш код должен быть изменен следующим образом:

parsed = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "my_kafka_topic") \
  .load() \
  .withColumn("value", regexp_replace(col("value").cast("string"), "\\\\", "")) \
  .withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
  .select(from_json(col("value"), schema).alias("opc"))

Теперь ваш вывод должен быть:

+------------------------------------------------------------------------------------------------------------------+
|value                                                                                                             |
+------------------------------------------------------------------------------------------------------------------+
|{"timestamp":1553588718638,"values":[{"id":"Simulation.Simulator.Temperature","v":26,"q":true,"t":1553588717036}]}|
+------------------------------------------------------------------------------------------------------------------+

Удачи!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...