Как выбрать один кодировщик для всех подклассов Avro SpecificRecordBase в Apache Beam? - PullRequest
0 голосов
/ 02 марта 2020

Фон

My Beam Pipeline предназначен для обработки элементов типа Avro SpecificRecordBase.

Чтобы упростить мою проблему, скажем, у меня сгенерировано два типа элементов в формате Avro все они имеют свои собственные поля:

class Dog extends SpecificRecordBase {
    ....
}

class Cat extends SpecificRecordBase {
   ...
}

Конвейер будет считывать элемент из входных данных Kafka, обрабатывать элементы и помещать обработанные элементы в выходные данные Kafka, как показано ниже:

Pipeline pipeline = Pipeline.create(getOptions());
pipeline.getCoderRegistry().registerCoderForClass(SpecificRecordBase.class, <what shall I put here?>);

pipeline.apply(kafkaReaderTransformer)
              .apply(Window.into(FixedWindows.of(Duration.standardSeconds(getWindowSize()))))
                .apply(GroupByKey.create())
                .apply(ParDo.of(GiveShowerToPetDoFn))
                .apply(Flatten.iterables())
                .apply(kafkaWriterTransformer);

Вопрос

Мой вопрос: как мне зарегистрировать кодер в моем конвейере? Поскольку в будущем пиплэйн может читать данные Cat Kafka или Dog Kafka и, возможно, Toad Kafka, мне нужен универсальный c способ регистрации кодера, который может сериализовать весь подкласс SpecificRecordBase, который определяется во время выполнения.

Мои неудачные решения

Я пробую следующее, чтобы заполнить пробел в коде:

  1. AvroCoder.of (SpecificRecordBase.class ): Не работает

    При запуске конвейера я получил ошибку ниже:

Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
 at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
 at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
 at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
 at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
 at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
 ... 23 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class org.apache.avro.specific.SpecificRecordBase
 at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
 at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
 at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
 at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
 at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
 at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
 at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
 at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
 ... 27 more

SerializableCoder.of (SpecificRecordBase.class): сбивает с толку исключение

Это должно быть многообещающим вариантом, но я получил очень запутанную ошибку ниже, когда я запускаю конвейер, ниже вводит в заблуждение, потому что Cat фактически реализует сериализуемый по наследству от SpecificRecordBase:

   Caused by: java.lang.ClassCastException: Cat cannot be cast to java.io.Serializable
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
    at org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:53)
    at org.apache.beam.runners.spark.coders.CoderHelpers.lambda$toByteFunction$28e77fe8$1(CoderHelpers.java:143)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1043)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1043)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Не устанавливайте энкодер самостоятельно, пусть Beam выводит. Система выведет подходящий кодер для меня. Решение «ничего не делать» работает на моей автономной машине loal, но когда я помещаю их в настоящий многосерверный env, они просто генерируют исключения, указывающие, что они не могут определить кодировщик.
Caused by: java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Deserialize)/ParMultiDo(Deserialize).output [PCollection]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, org.apache.avro.specific.SpecificRecordBase>: Unable to provide a Coder for org.apache.avro.specific.SpecificRecordBase.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
    at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
    at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:191)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:538)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)

1 Ответ

0 голосов
/ 13 апреля 2020

Я наконец решил проблему, опубликованную здесь, используя обходной путь.

Root причина

Оказалось, что это вызвано несовместимым кодировщиком в другой среде. Хотя кодировщик работает в моем локальном env, версия зависимости prod отличается, что делает библиотеку Beam неспособной кодировать и декодировать класс, полученный из SpecificRecordBase.

Два решения

1) Измените каждый doFun в вашем конвейере, используя байты в качестве входных данных и байты в качестве выходных данных:

public class GiveShowerToPetDoFn extends DoFn<KV<String, byte[]>, KV<String, byte[]>> {
   ...
}

Это означает, что вы будете десериализовывать объект из байтов вручную, прежде чем выполнять фактическую бизнес-логику c и сериализовать ваш результат обратно в байты в качестве последнего шага. Это заставляет Beam использовать кодировщик / декодер байтов по умолчанию между приложенным doFun, и кодер / декодер всегда будет работать, потому что он обрабатывает тип basi c, а не самоопределяемый тип.

2) Напишите свой собственный кодер / декодер для вашего пользовательского типа.

Решение 1 и 2 одинаковы по сути. В моем случае я использую первое решение, чтобы обойти мою проблему.

...