JAVA - Apache BEAM-GCP: GroupByKey отлично работает с Direct Runner, но не работает с Dataflow runner - PullRequest
1 голос
/ 11 марта 2020

Я тестировал свой код с помощью бегуна Dataflow, однако он возвращает ошибку:

> Error message from worker: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'WindowReiterable[ ] 
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.beam.sdk.util.UserCodeException:
> com.fasterxml.jackson.core.JsonParseException: Unrecognized token
> 'WindowReiterable': was expecting ('true', 'false' or 'null') at
> [Source: (String)"WindowReiterable []

Обратите внимание, что я использовал тот же код с Direct Runner, и он работает просто отлично. Кто-нибудь когда-нибудь сталкивался с этой проблемой? Если да, подскажите, пожалуйста, как это решить? Или я должен заменить GroupByKey другой функцией ...?

Вот код:

PCollection<KV<String, Iterable<String>>> KVElements =
        pipeline.apply("Reads the input fixed-width file", TextIO
                .read()
                .from(options.getPolledFile())).apply("Converts to KV elements, ParDo.of(new DoFn<String, String>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                String element = c.element();
        String[] columns = (“key;col1;col2;col3”).split(";");
        String[] values = element.split(";");
            ObjectNode rowToJson = jsonParser.createObjectNode();
        for (int i = 0; i < columns.length; i++) {
             rowToJson.put(columns[i], values[i].trim());
        }

    c.output(KV.of(rowToJson.get(“key”).asText(), rowToJson.toString()));

}}));

PCollection <KV<String, Iterable<String>>> joinedCollection = KVElements.apply(GroupByKey.create());

PCollection  <String> joined = (PCollection<String>) joinedCollection.apply("Converts to json string", ParDo.of(new DoFn<KV<String, Iterable<String>>, String>(){

    @ProcessElement
    public void processElement(ProcessContext c) throws IOException {
        KV<String, Iterable<String>> element = c.element();
        JsonNode parsed = jsonParser.readTree(String.valueOf(element.getValue()));
        final ObjectMapper mapper = new ObjectMapper();
        ObjectNode KVJson = mapper.createObjectNode();
        String value = null;

        for (int i =0; i<parsed.size();i++){
            KVJson.put("col1",parsed.get(i).get("col1"));
            KVJson.put("col2",parsed.get(i).get("col2"));
            KVJson.put("col3",parsed.get(i).get("col3"));
            }

        c.output(KVJson.toString());

}}));

Версия Apache Beam: 2.17.0

1 Ответ

1 голос
/ 15 марта 2020

Похоже, что ParDo не определен правильно. Во фрагменте кода

"Converts to KV elements, ParDo.of(new DoFn<String, String>

должен быть изменен, чтобы соответствовать результату KV, который генерируется как выходной, что-то вроде ниже

"Converts to KV elements, ParDo.of(new DoFn<String, KV<String, Iterable<String>>>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...