Spark структурированная потоковая передача с форматированием ввода JSON Kafka в JAVA - PullRequest
0 голосов
/ 01 мая 2018

Нашел несколько идей для Scala, но не смог реализовать на Java с успехом, поэтому опубликовал как новый вопрос.

Мне нужно отформатировать входной JSON в потоковом столбце "value" из темы Kafka

Dataset<Row> output = df.select(functions.from_json(df.col("value"), schema));

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

Достигнуто до того момента, когда вы увидите печать на консольном приемнике -

StreamingQuery query = output.writeStream().format("console").start();

+-------------------------+ 
|     jsontostructs(value)|
+-------------------------+
|                    []   |
+-------------------------+

Посоветуйте, пожалуйста, как получить отдельные столбцы из этой структуры.

Ответы [ 2 ]

0 голосов
/ 30 января 2019

Таким образом, в основном нужно использовать функцию «from_json» в сочетании с функцией schema.json () для получения схемы String (аналогично тому, что Filip упоминал выше в scala). Надеюсь, это кому-нибудь поможет.

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

Dataset<Row> output = df.select(from_json(df.col("value"), DataType.fromjson(schema.json())).as("data")).select("data.*");

последний выбор сгладит структуру в поле, определенное непосредственно в схеме.

0 голосов
/ 13 июля 2018

У вас уже есть схема, определенная для вашего сообщения JSON ...

val sparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("test")
    .getOrCreate()

val df: DataFrame = sparkSession
    .readStream
    .format("kafka")...

import org.apache.spark.sql.functions._
import sparkSession.implicits._

val ds = df.select($"value" cast "string" as "json")
        .select(from_json($"json", schema) as "data")
        .select("data.*")

Обратите внимание, что append режим вывода не поддерживается, когда вы путаетесь с потоковыми агрегатами на потоковой DF / DS без водяных знаков, поэтому, если вы хотите сходить с ума с агрегациями, не забудьте обновить вывод до чего-то следующего :

val query = aggregations
           .writeStream
           .outputMode("complete")
           .format("console")
           .start()

query.awaitTermination()
...