Как работать с массивом JSON как записями Кафки в потоковых запросах (Java)? - PullRequest
2 голосов
/ 02 декабря 2019

Я просмотрел множество примеров для чтения данных JSON из темы Кафа. Мне удалось сделать это успешно, если я прочитал одну запись из темы для каждого соединения, например:

{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
 "product": "Super widget",
 "price": 10,
 "bought_date": "2019-01-01"
}

Приведенный ниже код работает для приведенного выше варианта использования:

package io.examle;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

    public static void main(String[] args) throws StreamingQueryException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
            });

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        dataset.printSchema();

        Column col = new Column("json");

        Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");           
        customers.printSchema();

        customers.writeStream()      
        .format("console")
        .start()
        .awaitTermination();

    }

}

Но вышесказанное кажется мне неэффективным, то есть установление соединения с Kafa для получения единственной записи за соединение. Поэтому передача массива JSON в форме ниже будет более эффективной, на мой взгляд. Как можно снабдить его множеством «записей» на массив json.

[{
        "customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-01-01"
    } 
]

Проблема в том, что я не могу распаковать массив JSON и обработать его. Приведенный ниже код завершается ошибкой:

package io.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

    public static void main(String[] args) throws StreamingQueryException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
            });

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        dataset.printSchema();

        Column col = new Column("json");

        Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));            


        Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
        data.printSchema();

         data.writeStream()      
        .format("console")
        .start()
        .awaitTermination();
    }

}




Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;

Вопросы:

1) Как правильно написать код, который будет эффективно распаковывать массив JSON? Я сомневаюсь, что подход, который я использовал выше для кода, который терпит неудачу, является лучшим, но я попытался следовать многим примерам, которые я видел относительно functions.explode () и т.д.

2) Если код, который терпит неудачу, являетсякакое-то чудо правильный подход. Как мне преобразовать структуру в массив или карту?

1 Ответ

2 голосов
/ 02 декабря 2019

Spark не тянет одну запись на соединение. API-интерфейс Kafka будет опрашивать пакет записей одновременно.

Что касается передовых методов работы с Kafka, несколько событий следует разделять на несколько объектов, а не объединять в массив, если только они не требуют корреляции, например, выбудет иметь «корзину» запись со списком «элементов» для заказа

Для того, чтобы ваш код работал, ваша схема должна быть ArrayType (не структура или карта).

StructType schema = new StructType(new StructField[]{
            new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
            new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
            new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
            new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
        });

ArrayType arrSchema = new ArrayType(schema, false);

Затем используйте схему массива при использовании from_json.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...