У меня есть пункт, который читает из KafkaIO. Direct Runner используется:
PipelineOptions options = PipelineOptionsFactory.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tracker-statistics-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
PTransform<PBegin, PCollection<KV<String, String>>> kafkaIo = KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapAddress)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
.withoutMetadata();
pipeline
.apply(kafkaIo)
.apply(Values.create())
.apply("ParseEvent", ParDo.of(new ParseEventFn()))
.apply("test", ParDo.of(new PrintFn()));
pipeline.run();
И каждый раз, когда потребитель получает сообщение - он автоматически меняет смещение, даже потребитель ENABLE_AUTO_COMMIT_CONFIG имеет значение false. И когда происходит сбой моего конвейера (исключение времени выполнения), я больше не могу читать это сообщение, потому что оно уже зафиксировано. Я думал, что метод .commitOffsetsInFinalize()
гарантирует, что только когда конвейер закончен, мы фиксируем сообщение. Как я могу получить это поведение? Есть ли в KafkaIO вариант для этого? Или я должен написать свой собственный KafkaIO, который обеспечит эту функциональность?