Я просмотрел множество примеров для чтения данных JSON из темы Кафа. Мне удалось сделать это успешно, если я прочитал одну запись из темы для каждого соединения, например:
{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
}
Приведенный ниже код работает для приведенного выше варианта использования:
package io.examle;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");
customers.printSchema();
customers.writeStream()
.format("console")
.start()
.awaitTermination();
}
}
Но вышесказанное кажется мне неэффективным, то есть установление соединения с Kafa для получения единственной записи за соединение. Поэтому передача массива JSON в форме ниже будет более эффективной, на мой взгляд. Как можно снабдить его множеством «записей» на массив json.
[{
"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
"product": "Food widget",
"price": 4,
"bought_date": "2019-01-01"
}
]
Проблема в том, что я не могу распаковать массив JSON и обработать его. Приведенный ниже код завершается ошибкой:
package io.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));
Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
data.printSchema();
data.writeStream()
.format("console")
.start()
.awaitTermination();
}
}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;
Вопросы:
1) Как правильно написать код, который будет эффективно распаковывать массив JSON? Я сомневаюсь, что подход, который я использовал выше для кода, который терпит неудачу, является лучшим, но я попытался следовать многим примерам, которые я видел относительно functions.explode () и т.д.
2) Если код, который терпит неудачу, являетсякакое-то чудо правильный подход. Как мне преобразовать структуру в массив или карту?