Мои данные имеют форму json с разделителями новой строки и выглядят так, как показано ниже.Я читаю данные такого типа из темы Кафки.
{"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":{"name":"John Doe", "id":"12DROIY321"}}
Я хочу построить конвейер луча apache, который читает эти данные из Kafka, анализирует этот формат json, чтобы получить вывод, как показано ниже:
S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321
Вывод в основномстрока с разделителями-запятыми, состоящая из отправителя, отметки времени, messageType и id из данных.
Пока мой код такой:
public class Pipeline1{
public static void main(String[] args){
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 35 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(35)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("test"));
p.run().waitUntilFinish();
}
}
Я не могу понять, как разобрать json, чтобы получить требуемый формат csv в конвейере.Используя приведенный выше код, я могу записать те же строки json в файл, а с помощью приведенного ниже кода я могу проанализировать json, но кто-нибудь может помочь мне разобраться, как выполнить это в качестве дополнительного шага с конвейером лучалогика?
JSONParser parser = new JSONParser();
Object obj = null;
try {
obj = parser.parse(strLine);
} catch (ParseException e) {
e.printStackTrace();
}
JSONObject jsonObject = (JSONObject) obj;
String sender = (String) jsonObject.get("sender");
String messageType = (String) jsonObject.get("messageType");
String timestamp = (String) jsonObject.get("timestamp");
System.out.println(sender+","+timestamp+","+messageType);