Как мне разобрать json сообщений от Кафки в Spark Streaming? Я конвертирую JavaRDD в набор данных и оттуда извлекаю значения. Обнаружен успех при извлечении значений, однако я не могу извлечь вложенные json значения, такие как "host.name" и "fields.type".
Входящее сообщение от Kafka:
{
"@timestamp": "2020-03-03T10:48:03.160Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.6.0"
},
"host": {
"name": "test.com"
},
"agent": {
"id": "7651453414",
"version": "7.6.0",
"type": "filebeat",
"ephemeral_id": "71983698531",
"hostname": "test"
},
"message": "testing",
"log": {
"file": {
"path": "/test.log"
},
"offset": 250553
},
"input": {
"type": "log"
},
"fields": {
"type": "test"
},
"ecs": {
"version": "1.4.0"
}
}
Искровой код:
StructField[] structFields = new StructField[] {
new StructField("message", DataTypes.StringType, true, Metadata.empty()) };
StructType structType = new StructType(structFields);
StructField[] structFields2 = new StructField[] {
new StructField("host", DataTypes.StringType, true, Metadata.empty()),
new StructField("fields", DataTypes.StringType, true, Metadata.empty()),
new StructField("message", DataTypes.StringType, true, Metadata.empty()) };
StructType structType2 = new StructType(structFields2);
JavaRDD<Row> rowRDD = rdd.map(new Function<ConsumerRecord<String, String>, Row>() {
/**
*
*/
private static final long serialVersionUID = -8817714250698168398L;
@Override
public Row call(ConsumerRecord<String, String> r) {
Row row = RowFactory.create(r.value());
return row;
}
});
Dataset<Row> rowExtracted = spark.createDataFrame(rowRDD.rdd(), structType)
.select(functions.from_json(functions.col("message"), structType2).as("data")).select("data.*");
rowExtracted.printSchema();
rowExtracted.show((int) rowExtracted.count(), false);
PrintSchema:
root
|-- host: string (nullable = true)
|-- fields: string (nullable = true)
|-- message: string (nullable = true)
Фактический результат:
+---------------+---------------+-------+
|host |fields |message|
+---------------+---------------+-------+
|{"name":"test"}|{"type":"test"}|testing|
+---------------+---------------+-------+
Ожидаемый результат:
+---------------+---------------+-------+
|host |fields |message|
+---------------+---------------+-------+
|test |test |testing|
+---------------+---------------+-------+