Вручную признать, что событие Кафки A потребляет после создания события B - PullRequest
1 голос
/ 20 февраля 2020

У меня есть случай, когда мне нужно использовать событие A и выполнить некоторую обработку, а затем создать событие B. Поэтому моя проблема заключается в том, что произойдет, если обработка завершится сбоем, и приложение не сможет выдать B, пока оно уже использует A. Мой подход заключается в том, чтобы после успешной публикации B подтвердить, правильно ли я поступил или мне нужно другое решение для этого случая?

@KafkaListener(
        id = TOPIC_ID,
        topics = TOPIC_ID,
        groupId = GROUP_ID,
        containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {

    try {
        final AEvent aEvent = messages.stream()
                .filter(message -> null != message.getPayload())
                .map(Message::getPayload)
                .findFirst()
                .get();

        processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
                .subscribe(
                        response -> {
                            ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);

                            ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
                            future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
                                @Override
                                public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
                                    //TODO: do when event published successfully
                                }

                                @Override
                                public void onFailure(Throwable exception) {
                                    exception.printStackTrace();
                                    throw new ExampleException();
                                }
                            });
                        },
                        error -> {
                            error.printStackTrace();
                            throw new ExampleException();
                        }
                );
        acknowledgment.acknowledge(); // ??
    } catch (ExampleException) {
        exception.printStackTrace();
    }
}

1 Ответ

1 голос
/ 20 февраля 2020

Вы не можете управлять "подтверждениями" kafka при использовании асинхронного кода c, такого как реактор.

Kafka не управляет дискретными подтверждениями для каждой темы / раздела, а только последним принятым смещением для раздела.

Если вы обрабатываете две записи асинхронно, у вас будет гонка относительно того, какое смещение будет зафиксировано первым.

Вам необходимо выполнить посылку в потоке контейнера слушателя для поддержания правильного порядка.

...