Apache beam IOException в декодере - PullRequest
       96

Apache beam IOException в декодере

0 голосов
/ 22 октября 2019

У меня есть простой конвейер, который читает из Kafka читателем KafkaIO и затем преобразует в конвейер. В конце он записывает в GCP в формате avro. Поэтому, когда я запускаю конвейер в DataFlow, он работает отлично, но когда бегун является DirectRunner, он читает все данные из тем и выдает исключение.

java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
    at org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:251)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:237)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
    at org.apache.beam.repackaged.direct_java.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.processElement(GroupAlsoByWindowEvaluatorFactory.java:185)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:136)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
    ... 19 more

Я использую собственный сериализатор и десериализатор для чтения avro и получения paylod.

Kafka Reader

  private PTransform<PBegin, PCollection<KV<String, AvroGenericRecord>>> createKafkaRead(Map<String, Object> configUpdates) {
        return KafkaIO.<String, AvroGenericRecord>read()
                .withBootstrapServers(bootstrapServers)
                .withConsumerConfigUpdates(configUpdates)
                .withTopics(inputTopics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
                .withMaxNumRecords(maxNumRecords)
                .commitOffsetsInFinalize()
                .withoutMetadata();
    }

AvroGenericCoder

public class AvroGenericCoder extends CustomCoder<AvroGenericRecord> {
    private final Map<String, Object> config;
    private transient BeamKafkaAvroGenericDeserializer deserializer;
    private transient BeamKafkaAvroGenericSerializer serializer;

    public static AvroGenericCoder of(Map<String, Object> config) {
        return new AvroGenericCoder(config);
    }

    protected AvroGenericCoder(Map<String, Object> config) {
        this.config = config;
    }

    private BeamKafkaAvroGenericDeserializer getDeserializer() {
        if (deserializer == null) {
            BeamKafkaAvroGenericDeserializer d = new BeamKafkaAvroGenericDeserializer();
            d.configure(config, false);
            deserializer = d;
        }
        return deserializer;
    }

    private BeamKafkaAvroGenericSerializer getSerializer() {
        if (serializer == null) {
            serializer = new BeamKafkaAvroGenericSerializer();
        }
        return serializer;
    }

    @Override
    public void encode(AvroGenericRecord record, OutputStream outStream) {
        getSerializer().serialize(record, outStream);
    }

    @Override
    public AvroGenericRecord decode(InputStream inStream) {
        try {
            return getDeserializer().deserialize(null, IOUtils.toByteArray(inStream));
        } catch (IOException e) {
            throw new RuntimeException("Error translating into bytes ", e);
        }
    }

    @Override
    public void verifyDeterministic() {
    }

    @Override
    public Object structuralValue(AvroGenericRecord value) {
        return super.structuralValue(value);
    }

    @Override
    public int hashCode() {
        return HashCodeBuilder.reflectionHashCode(this);
    }

    @Override
    public boolean equals(Object obj) {
        return EqualsBuilder.reflectionEquals(this, obj);
    }
}

Это магистральный трубопровод

PCollection<AvroGenericRecord> records = p.apply(readKafkaTr)
                .apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardMinutes(options.getWindowInMinutes())))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(Duration.standardMinutes(options.getWindowInMinutes())))
                                .withLateFirings(AfterPane.elementCountAtLeast(options.getElementsCountToWaitAfterWatermark())))
                        .withAllowedLateness(Duration.standardDays(options.getAfterWatermarkInDays()))
                        .discardingFiredPanes()
                );

        records.apply(Filter.by((ProcessFunction<AvroGenericRecord, Boolean>) Objects::nonNull))
                .apply(new WriteAvroFilesTr(options.getBasePath(), options.getNumberOfShards()));

1 Ответ

0 голосов
/ 23 октября 2019

Да, я думаю, что @RyanSkraba прав - DirectRunner делает много вещей, которые делают не все другие бегуны (поскольку первоначальная цель DirectRunner должна была использоваться для тестирования, поэтому он выполняет много дополнительных проверок по сравнению с другими бегунами).

Кстати, почему бы не использовать Beam AvroCoder в этом случае? Простой пример, как использовать его с KafkaIO: https://github.com/aromanenko-dev/beam-issues/blob/master/kafka-io/src/main/java/KafkaAvro.java

...