Потратив несколько дней, я пришел к следующему решению:
Основная идея заключается в том, чтобы выполнить синхронизацию в двух режимах, а именно: Восстановление и Обычный
- В режиме восстановленияЯ только потребляю данные, но я не создаю никаких данных.
- В обычном режиме я потребляю и создаю данные.
В Kafka я реализовал это с использованием двух слушателей, принадлежащих к разным группам потребителей.,При запуске все слушатели останавливаются, и я решаю, с каким типом слушателя включается.Как только смещение всех слушателей восстановления достигает водяных знаков обычных слушателей, я останавливаю списки восстановления и запускаю обычных слушателей.
Ниже соответствующей части моего кода:
public void startListeners() {
log.debug("get partitions from application");
final List<KafkaPartitionStateKey> partitions = getPartitions();
log.debug("load partition state from hazelcast");
final Map<KafkaPartitionStateKey, KafkaPartitionState> kafkaPartitionStates = kafkaPartitionStateService.loadKafkaPartitionStateMap();
log.debug("check if in sync");
if (areAllPartitionsReady(partitions, kafkaPartitionStates)) {
log.info("all partitions ready, not need to start recovery");
this.messageListenerContainers.forEach(this::startContainer);
return;
}
log.debug("load consumer group offsets from kafka");
consumerGroupOffsets = getConsumerGroupOffsets();
log.debug("create missing partition states");
final List<KafkaPartitionState> updatedPartitionStates = getOrCreatePartitionStates(partitions, kafkaPartitionStates, consumerGroupOffsets);
log.debug("check if all partitions are ready");
if (getNumberOfNotReadyPartitions(updatedPartitionStates) == 0) {
log.info("all partitions ready, no need to start recovery");
this.messageListenerContainers.forEach(this::startContainer);
return;
}
log.info("----- STARTING RECOVERY -----");
this.recoveryListenerContainers.forEach(this::startContainer);
}
Я надеюсьэто кому-то полезно ...