Мой вариант использования - использовать kafka consumer-api, чтобы мы могли вручную прочитать смещение последних успешно обработанных данных из kafka-topi c и затем подтвердить вручную для успешно обработанных данных в Kafka , (Это должно было уменьшить потерю данных). Однако, с моей текущей реализацией, программа движется вперед и читает со следующего смещения, даже если я закомментирую 'ack.acknowledge ()' . Я новичок в Kafka и реализовал моего потребителя следующим образом (мы используем весеннюю загрузку)
Проблема в следующем: Даже если я закомментирую ack.acknowledge (), смещение все еще будет обновляется, и потребитель читает из следующего смещения, что является неожиданным (насколько я понимаю)
Consumer Config [Обратите внимание, что я установил ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG в false и установил factory.getContainerProperties (). setAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private AdapterStreamProperties appProperties;
@Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
private String receiveBufferBytes;
@Bean
public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"adapter");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
Тогда мой потребитель потребляет этот способ [Даже если я закомментирую «ack.acknowledge ()», он все еще читает из следующее смещение в следующий раз:
@KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
System.out.println("----------------------------------------------------------------------------");
System.out.println("Reading from Offset: " + offset + ", and Partition: " + partition);
System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " + record.value());
System.out.println("----------------------------------------------------------------------------");
NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());
if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
}
else {
kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
}
ack.acknowledge();
}
Версия API Кафки из моего gradle.build:
//Kafka Dependencie
implementation 'org.apache.kafka:kafka-streams:2.0.1'
implementation 'org.apache.kafka:kafka-clients:2.0.1'
Любое понимание будет полезно.
Заранее спасибо