Apache передает несколько групп потребителей в KafkaIO.read () |Недостаточно памяти - PullRequest
1 голос
/ 31 октября 2019

Я работаю над потоковой передачей Apache Beam. Я создал поток, который читает много тем и поместил все данные в GCS.

Мой KafkaIO.reader

KafkaIO.<String, AvroGenericRecord>read()
                .withBootstrapServers(bootstrapServers)
                .withConsumerConfigUpdates(configUpdates)
                .withTopics(inputTopics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
                .withMaxNumRecords(maxNumRecords)
                .commitOffsetsInFinalize()
                .withoutMetadata();

In configUpdates Я положил ConsumerConfig.GROUP_ID_CONFIG значение.

Я хотел бы сделать так, чтобы я мог читать 2-3 группы потребителей, возможно ли достичь? Потому что у меня есть некоторые темы, данные которых приходят быстро, а некоторые нет.

UPD

Причина, по которой я хотел создать несколько групп потребителей, заключается в нехватке памяти о моей работе.

gcp#3|Caused by: java.lang.OutOfMemoryError: Java heap space
gcp#3|java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
gcp#3|        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
gcp#3|        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
gcp#3|        java.lang.Thread.run(Thread.java:745)
gcp#3|Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.OutOfMemoryError: Java heap space
gcp#3|        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
gcp#3|        org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:180)
gcp#3|        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
gcp#3|        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
gcp#3|        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
gcp#3|        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
gcp#3|        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
gcp#3|        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
gcp#3|        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
gcp#3|        java.lang.Thread.run(Thread.java:745)
gcp#3|Caused by: java.lang.OutOfMemoryError: Java heap space

Как я теперь понимаю,Я думаю, что проблема не в чтении из Кафки, а в неправильном управлении окнами. У меня много тем (40+), и я пытаюсь прочитать их все, много данных ... Я пытаюсь сделать время события оконным, чтобы обрабатывать все.

Этомое оконное управление:

 records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(options.getWindowInMinutes())))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                        .withLateFirings(AfterPane.elementCountAtLeast(options.getElementsCountToWaitAfterWatermark())))
                .withAllowedLateness(Duration.standardHours(1))
                .discardingFiredPanes()

UPD 2.0

Я думаю, что это происходит во время записи.

Это мой класс, который помещает avro-данные в сегменты GCP,Он должен поместить данные по названию темы и отметке времени. Окончательный вывод должен быть bucket / {topic} / {date} / {'avroContainerPerWindowOrPane'}

Вот как я это сделал.

public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
    private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
    private final String baseDir;
    private final String fileExtension;

    public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
        this.baseDir = baseDir;
        this.fileExtension = fileExtension;
    }

    @Override
    public Schema getSchema(AvroDestination destination) {
        return new Schema.Parser().parse(destination.jsonSchema);
    }

    @Override
    public GenericRecord formatRecord(AvroGenericRecord record) {
        return record.getRecord();
    }

    @Override
    public AvroDestination getDestination(AvroGenericRecord record) {
        Schema schema = record.getRecord().getSchema();
        return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
    }

    @Override
    public AvroDestination getDefaultDestination() {
        return new AvroDestination();
    }

    @Override
    public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
        String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
        return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
    }

    private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
        final ResourceId outputFilePrefix;
        final String fileExtension;
        final Integer version;

        WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
            this.outputFilePrefix = outputFilePrefix;
            this.version = version;
            this.fileExtension = fileExtension;
        }

        @Override
        public ResourceId windowedFilename(
                int shardNumber,
                int numShards,
                BoundedWindow window,
                PaneInfo paneInfo,
                FileBasedSink.OutputFileHints outputFileHints) {

            IntervalWindow intervalWindow = (IntervalWindow) window;

            String filenamePrefix =
                    outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");

            String filename =
                    String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
                            version,
                            formatter.print(intervalWindow.start()),
                            formatter.print(intervalWindow.end()),
                            shardNumber,
                            numShards - 1,
                            fileExtension);
            ResourceId result = outputFilePrefix.getCurrentDirectory();
            return result.resolve(filename, RESOLVE_FILE);
        }

        @Override
        public ResourceId unwindowedFilename(
                int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(
                    DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
                            .withLabel("File Name Prefix"));
        }
    }

}

1 Ответ

0 голосов
/ 04 ноября 2019

Я не думаю, что KafkaIO позволяет иметь разные GROUP_ID в одном и том же преобразовании чтения Kafka. Что ж, мы допускаем две разные конфигурации для потребителей, но это потому, что под капотом на самом деле в KafkaIO есть два потребителя - для сообщений и для смещений, так что это другая история. Кстати, в чем проблема использования сообщений из тем с различной частотой поступления в вашем случае?

...