Почему Apache Beam не может определить кодер по умолчанию при использовании KV? - PullRequest
0 голосов
/ 04 июня 2019

Я реализую CombinePerKeyExample , используя подкласс CombineFn вместо использования SerializableFunction.

package me.examples;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;

import java.util.HashSet;
import java.util.Set;

public class ConcatWordsCombineFn extends CombineFn<String, ConcatWordsCombineFn.Accumulator, String> {
    @DefaultCoder(AvroCoder.class)
    public static class Accumulator{
        HashSet<String> plays;
    }

    @Override
    public Accumulator createAccumulator(){
        Accumulator accumulator = new Accumulator();
        accumulator.plays = new HashSet<>();
        return accumulator;
    }

    @Override
    public Accumulator addInput(Accumulator accumulator, String input){
        accumulator.plays.add(input);
        return accumulator;
    }

    @Override
    public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators){
        Accumulator mergeAccumulator = new Accumulator();
        mergeAccumulator.plays = new HashSet<>();

        for(Accumulator accumulator: accumulators){
            mergeAccumulator.plays.addAll(accumulator.plays);
        }

        return mergeAccumulator;
    }

    @Override
    public String extractOutput(Accumulator accumulator){
        return String.join(",", accumulator.plays);
    }
}

Трубопровод состоит из ReadFromBigQuery, ExtractAllPlaysOfWords (код ниже) и WriteToBigQuery

package me.examples;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class PlaysForWord extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {


    @Override
    public PCollection<TableRow> expand(PCollection<TableRow> input) {

            PCollection<KV<String, String>> largeWords = input.apply("ExtractLargeWords", ParDo.of(new ExtractLargeWordsFn()));

            //PCollection<KV<String, String>> wordNPlays = largeWords.apply("CombinePlays", Combine.perKey(new ConcatWordsCombineFunction()));
            //using CombineFn instead
            PCollection<KV<String, String>> wordNPlays = largeWords.apply("CombinePlays",Combine.perKey(new ConcatWordsCombineFn()));
            wordNPlays.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
            PCollection<TableRow> rows = wordNPlays.apply("FormatToRow", ParDo.of(new FormatShakespeareOutputFn()));
            return rows;
    }
}

Если я не добавляю эту строку в коде выше

 wordNPlays.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

У меня исключение

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for ExtractAllPlaysOfWords/CombinePlays/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes:
          No Coder has been manually specified;  you may do so using .setCoder().
          Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a Coder for K.
          Building a Coder using a registered CoderProvider failed.
          See suppressed exceptions for detailed failures.
          Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
                at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
                at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
                at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
                at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:191)
                at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
                at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
                at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
                at me.examples.PlaysForWord.expand(PlaysForWord.java:21)
                at me.examples.PlaysForWord.expand(PlaysForWord.java:10)
                at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
                at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
                at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
                at me.examples.Main.main(Main.java:41)

Из стековой трассировки я думаю, что конвейер не может получить кодер для типа String объекта KV. Это почему ? Не должен быть "известным" типом для Apache Beam. Почему он работает без указания кодера при использовании подкласса SerializableFunction в Combine.perKey?

В дополнение к этому, когда я пытался получить кодировщик по умолчанию для String из реестра кодировщиков, я получаю StringUTF8Coder

Coder coder = null;
       try {
           coder = pipeline.getCoderRegistry().getCoder(String.class);
           logger.info("coder is " + coder);
       } catch (Exception e){
           logger.info("exception "+ e.getMessage() +"\n coder is " + coder );
       }
/*result
INFO: coder is StringUtf8Coder
*/

Я использовал Apache Beam 2.12.0 и запустил его в Google Dataflow

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...