Event Sourcing с Kafka для микросервисов - PullRequest
0 голосов
/ 05 октября 2018

У меня есть несколько микросервисов, которые используют данные из kafka.Они потребляют и передают данные брокеру.

Эти микросервисы имеют только энергозависимое хранилище (Hazelcast).Когда хранилище теряется, мне нужно восстановить его на основе основных данных, находящихся в kafka.

Моя наивная реализация просто снова потребляет эти данные, но затем я выдаю некоторые старые данные брокеру.Это снова вызывает другие микросервисы, что кажется плохой идеей.

Есть ли стандартный способ обработки этого варианта использования?Мне кажется, что это очень распространенная проблема, или я что-то не так делаю?

Ответы [ 2 ]

0 голосов
/ 08 октября 2018

Потратив несколько дней, я пришел к следующему решению:

Основная идея заключается в том, чтобы выполнить синхронизацию в двух режимах, а именно: Восстановление и Обычный

  • В режиме восстановленияЯ только потребляю данные, но я не создаю никаких данных.
  • В обычном режиме я потребляю и создаю данные.

В Kafka я реализовал это с использованием двух слушателей, принадлежащих к разным группам потребителей.,При запуске все слушатели останавливаются, и я решаю, с каким типом слушателя включается.Как только смещение всех слушателей восстановления достигает водяных знаков обычных слушателей, я останавливаю списки восстановления и запускаю обычных слушателей.

Ниже соответствующей части моего кода:

public void startListeners() {
    log.debug("get partitions from application");
    final List<KafkaPartitionStateKey> partitions = getPartitions();

    log.debug("load partition state from hazelcast");
    final Map<KafkaPartitionStateKey, KafkaPartitionState> kafkaPartitionStates = kafkaPartitionStateService.loadKafkaPartitionStateMap();

    log.debug("check if in sync");
    if (areAllPartitionsReady(partitions, kafkaPartitionStates)) {
        log.info("all partitions ready, not need to start recovery");
        this.messageListenerContainers.forEach(this::startContainer);
        return;
    }

    log.debug("load consumer group offsets from kafka");
    consumerGroupOffsets = getConsumerGroupOffsets();

    log.debug("create missing partition states");
    final List<KafkaPartitionState> updatedPartitionStates = getOrCreatePartitionStates(partitions, kafkaPartitionStates, consumerGroupOffsets);

    log.debug("check if all partitions are ready");
    if (getNumberOfNotReadyPartitions(updatedPartitionStates) == 0) {
        log.info("all partitions ready, no need to start recovery");
        this.messageListenerContainers.forEach(this::startContainer);
        return;
    }

    log.info("----- STARTING RECOVERY -----");
    this.recoveryListenerContainers.forEach(this::startContainer);
}

Я надеюсьэто кому-то полезно ...

0 голосов
/ 05 октября 2018

Это было задано до .

Не должно иметь значения, что вы используете Kafka в качестве хранилища событий, поскольку проблема заключается в повторной отправке событий микросервисами.

...