Apache Beam KafkaIO обработка исключений в пипине - PullRequest
0 голосов
/ 19 апреля 2019

У меня есть пункт, который читает из KafkaIO. Direct Runner используется:

    PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "tracker-statistics-group");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    PTransform<PBegin, PCollection<KV<String, String>>> kafkaIo = KafkaIO.<String, String>read()
            .withBootstrapServers(bootstrapAddress)
            .withTopic(topic)
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .updateConsumerProperties(props)
            .withReadCommitted()
            // offset consumed by the pipeline can be committed back.
            .commitOffsetsInFinalize()
            .withoutMetadata();

    pipeline
            .apply(kafkaIo)
            .apply(Values.create())
            .apply("ParseEvent", ParDo.of(new ParseEventFn()))
            .apply("test", ParDo.of(new PrintFn()));

    pipeline.run();

И каждый раз, когда потребитель получает сообщение - он автоматически меняет смещение, даже потребитель ENABLE_AUTO_COMMIT_CONFIG имеет значение false. И когда происходит сбой моего конвейера (исключение времени выполнения), я больше не могу читать это сообщение, потому что оно уже зафиксировано. Я думал, что метод .commitOffsetsInFinalize() гарантирует, что только когда конвейер закончен, мы фиксируем сообщение. Как я могу получить это поведение? Есть ли в KafkaIO вариант для этого? Или я должен написать свой собственный KafkaIO, который обеспечит эту функциональность?

...