Kafka: Consumer api: невозможность считывания и подтверждения вручную со смещения с помощью kafka-consumer-api - PullRequest
1 голос
/ 21 января 2020

Мой вариант использования - использовать kafka consumer-api, чтобы мы могли вручную прочитать смещение последних успешно обработанных данных из kafka-topi c и затем подтвердить вручную для успешно обработанных данных в Kafka , (Это должно было уменьшить потерю данных). Однако, с моей текущей реализацией, программа движется вперед и читает со следующего смещения, даже если я закомментирую 'ack.acknowledge ()' . Я новичок в Kafka и реализовал моего потребителя следующим образом (мы используем весеннюю загрузку)

Проблема в следующем: Даже если я закомментирую ack.acknowledge (), смещение все еще будет обновляется, и потребитель читает из следующего смещения, что является неожиданным (насколько я понимаю)

Consumer Config [Обратите внимание, что я установил ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG в false и установил factory.getContainerProperties (). setAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

Тогда мой потребитель потребляет этот способ [Даже если я закомментирую «ack.acknowledge ()», он все еще читает из следующее смещение в следующий раз:

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

Версия API Кафки из моего gradle.build:

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

Любое понимание будет полезно.

Заранее спасибо

1 Ответ

1 голос
/ 21 января 2020
ack.acknowledge()

означает только то, что вы подтвердите, что ваш потребитель успешно обработал сообщение.

Это обновит смещение группы потребителей, если ваше приложение остановится без подтверждения, текущее смещение не будет зафиксировано в kafka (из последних сообщений, которые вы обработали и не подтвердили).

Новый потребитель с той же группой потребителей (и теми же назначенными подразделениями) будет снова использовать то же сообщение, потому что приложение будет считывать начальную точку из смещений группы потребителей.

Если вы подтверждаете или не подтверждаете, не имеет Влияние вашего текущего слушателя, это просто продолжит обрабатывать новые сообщения (до тех пор, пока не произойдет ошибки, и я думаю, что перебалансировка также может быть источником перезагрузки смещения (не полная уверенность))

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...