Apache Beam - использование пользовательских классов с универсальными типами и кодерами - PullRequest
0 голосов
/ 13 декабря 2018

Я начинающий пользователь Apache Beam.У меня не было проблем с реализацией логики для моих агрегатов, я просто немного раздражен тем, что не могу использовать абстрактные типы в своих пользовательских классах.

У меня есть несколько пользовательских классов, которые используются в качестве ввода / вывода для PTransforms.В этих классах я хотел бы использовать абстрактные типы, такие как, например, Set<String> вместо HashSet<String>, но я не могу, по крайней мере, использовать AvroCoder.Есть ли другой способ указать кодер, который автоматически поддерживает эти типы классов, или мне нужно указать свой собственный кодер.

Пример класса:

@DefaultCoder(AvroCoder.class)
public class WordStatistics {
  public String key;
  public Set<String> displays;
}

Исключение с помощью Setвместо HashSet

Caused by: java.lang.NoSuchMethodException: java.util.Set.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getDeclaredConstructor(Class.java:2178)
    at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
    at org.apache.avro.reflect.ReflectDatumReader.newArray(ReflectDatumReader.java:100)
    at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:133)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
    at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:317)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:113)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:260)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
    at com.unsilo.transforms.FromAccumulatorToStatsMapper.processElement(FromAccumulatorToStatsMapper.java:14)
    at com.unsilo.transforms.FromAccumulatorToStatsMapper$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:207)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
...