Мы создаем конвейер потока данных, который получит JSON и запишет в файл паркета.мы используем пакет org.apache.beam.sdk.io.parquet для записи файла.ParquetIO.Sink позволяет записать PC-коллекцию GenericRecord в файл Parquet (отсюда https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html).. Теперь мы узнаем, как преобразовать JsonObject (со сложной структурой) в GenericRecord.
Мы попытались сгенерироватьGenericRecord с использованием GenericRecordBuilder (org.apache.avro.generic.GenericRecordBuilder). И мы используем JsonObject из com.google.gson.JsonObject Но мы застряли, как преобразовать генерировать GenericRecord для JsonArray с Ojects
*Json
{
"event_name": "added_to_cart",
"event_id": "AMKL9877",
"attributes": [
{"key": "total", "value": "8982", "type": "double"},
{"key": "order_id", "value": "AKM1011", "type": "string"}
]
}
Наша схема
{
"type":"record",
"name":"event",
"fields":[
{
"name":"event_name",
"type":"string"
},
{
"name":"event_id",
"type":"string"
},
{
"name":"attributes",
"type":{
"type":"array",
"items":{
"type":"record",
"name":"attribute_data",
"fields":[
{
"name":"key",
"type":"string"
},
{
"name":"value",
"type":"string"
},
{
"name":"type",
"type":"string"
}
]
}
}
}
]
}
Наш код используется для преобразования JsonObject в GenericRecord с использованием GenericRecordBuilder
JsonObject event = element.getAsJsonObject();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA);
for (Schema.Field field:SCHEMA.getFields()) {
System.out.println(field);
String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY);
System.out.println(at_header);
if(at_header != null && at_header.equals(Boolean.TRUE.toString())){
recordBuilder.set(field.name(), null);
}else{
JsonElement keyElement = event.get(field.name());
recordBuilder.set(field.name(), getElementAsType(field.schema(), keyElement));
}
}
return recordBuilder.build();
Object getElementAsType(Schema schema, JsonElement element) {
if(element == null || element.isJsonNull())
return null;
switch(schema.getType()){
case BOOLEAN:
return element.getAsBoolean();
case DOUBLE:
return element.getAsDouble();
case FLOAT:
return element.getAsFloat();
case INT:
return element.getAsInt();
case LONG:
return element.getAsLong();
case NULL:
return null;
case ARRAY:
???
case MAP:
???
default:
return element.getAsString();
}
Нам нужно знать, как построить GenericRecord для сложныхнаберите как массив объектов, сопоставьте с JSON. Заранее спасибо.