Apache Spark 2.4 Структура потокового чтения из Kafka всегда Двоичная структура после применения нескольких преобразований - PullRequest
0 голосов
/ 14 ноября 2018

Я работаю с Apache Spark 2.4 и читаю данные json из kafka после применения многократного преобразования к потоковому запросу, окончательный вывод все еще двоичный.

val streamingDF = sparkSession.readStream
      .format("kafka")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", value = false)
      .option("maxOffsetsPerTrigger", 50000L)
      .option("kafka.bootstrap.servers", "kafka_server")
      .option("enable.auto.commit" , "false")
      .load()

val dataSet = streamingDF.selectExpr("CAST(value AS STRING)").as[String] 
val stream = dataSet.map{value => convertJSONToCaseClass(value)}
.map{data => futherconvertions(data)}.writeStream.format("console")
.outputMode(OutputMode.Update()).start()

После этого я получаю вывод, подобный этому, на консоль.

Batch: 8
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|

Ожидаемый вывод: кадр данных с несколькими столбцами

Есть ли что-то, что я делаю не так. Любая помощь будет оценена.

Спасибо

Ответы [ 2 ]

0 голосов
/ 01 февраля 2019

Spark 2.4 не поддерживает цепочку из нескольких агрегатов.

https://spark.apache.org/docs/2.4.0/structured-streaming-programming- guide.html # unsupported-operations

Несколько потоковых агрегатов (т. Е. Цепочка агрегатов в потоковом DF) еще не поддерживаются в потоковых наборах данных.

0 голосов
/ 27 ноября 2018

Не рекомендуется устанавливать «enable.auto.commit» в соответствии с docs.please смотрите конкретные конфигурации kafka https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html Также вы можете попробовать его, как показано ниже:

val streamingDF = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", value = false)
  .option("maxOffsetsPerTrigger", 50000L)
  .option("kafka.bootstrap.servers", "kafka_server")
  .load()
val df = streamingDF.selectExpr("CAST(value as STRING)")         

 val mySchema = StructType(Array(
  StructField("X", StringType, true),
  StructField("Y", StringType, true),
  StructField("Z", StringType, true))                            

val Resultdf = df.select(from_json($"value", mySchema).as("data")).select("data.*")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...