Как разобрать JSON в пучке трубопровода? - PullRequest
0 голосов
/ 15 ноября 2018

Мои данные имеют форму json с разделителями новой строки и выглядят так, как показано ниже.Я читаю данные такого типа из темы Кафки.

{"sender":"S1","senderHost":"ip-10-20-30-40","timestamp":"2018-08-13T16:17:12.874Z","topic":"test","messageType":"type_1","data":{"name":"John Doe", "id":"12DROIY321"}}

Я хочу построить конвейер луча apache, который читает эти данные из Kafka, анализирует этот формат json, чтобы получить вывод, как показано ниже:

S1,2018-08-13T16:17:12.874Z,type_1,12DROIY321

Вывод в основномстрока с разделителями-запятыми, состоящая из отправителя, отметки времени, messageType и id из данных.

Пока мой код такой:

public class Pipeline1{
    public static void main(String[] args){
        PipelineOptions options = PipelineOptionsFactory.create();

        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("test")
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)

                .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

                // We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
                // the first 35 records.
                // In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
                .withMaxNumRecords(35)

                .withoutMetadata() // PCollection<KV<Long, String>>
        )
                .apply(Values.<String>create())
                .apply(TextIO.write().to("test"));

        p.run().waitUntilFinish();

    }
}

Я не могу понять, как разобрать json, чтобы получить требуемый формат csv в конвейере.Используя приведенный выше код, я могу записать те же строки json в файл, а с помощью приведенного ниже кода я могу проанализировать json, но кто-нибудь может помочь мне разобраться, как выполнить это в качестве дополнительного шага с конвейером лучалогика?

JSONParser parser = new JSONParser();
            Object obj = null;
            try {
                obj = parser.parse(strLine);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            JSONObject jsonObject =  (JSONObject) obj;

            String sender = (String) jsonObject.get("sender");

            String messageType = (String) jsonObject.get("messageType");

            String timestamp = (String) jsonObject.get("timestamp");

            System.out.println(sender+","+timestamp+","+messageType);

1 Ответ

0 голосов
/ 15 ноября 2018

В соответствии с документацией вам необходимо написать преобразование (или найти преобразование, соответствующее вашему варианту использования).

https://beam.apache.org/documentation/programming-guide/#composite-transforms

Документация также предоставляет отличный пример.

Пример, который должен выдавать ваш вывод:

.apply(Values.<String>create())
.apply(
    "JSONtoData",                     // the transform name
    ParDo.of(new DoFn<String, String>() {    // a DoFn as an anonymous inner class instance
        @ProcessElement
        public void processElement(@Element String word, OutputReceiver<String> out) {
            JSONParser parser = new JSONParser();
            Object obj = null;
            try {
                obj = parser.parse(strLine);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            JSONObject jsonObject =  (JSONObject) obj;

            String sender = (String) jsonObject.get("sender");
            String messageType = (String) jsonObject.get("messageType");
            String timestamp = (String) jsonObject.get("timestamp");

            out.output(sender+","+timestamp+","+messageType);
        }
   }));

Чтобы вернуть значения CSV, просто измените обобщенные значения на:

new DoFn<String, YourCSVClassHere>()
OutputReceiver<YourCSVClassHere> out

Я не тестировал этот код, используйтена свой страх и риск.

...