У меня есть случай, когда мне нужно использовать событие A и выполнить некоторую обработку, а затем создать событие B. Поэтому моя проблема заключается в том, что произойдет, если обработка завершится сбоем, и приложение не сможет выдать B, пока оно уже использует A. Мой подход заключается в том, чтобы после успешной публикации B подтвердить, правильно ли я поступил или мне нужно другое решение для этого случая?
@KafkaListener(
id = TOPIC_ID,
topics = TOPIC_ID,
groupId = GROUP_ID,
containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {
try {
final AEvent aEvent = messages.stream()
.filter(message -> null != message.getPayload())
.map(Message::getPayload)
.findFirst()
.get();
processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
.subscribe(
response -> {
ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);
ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
@Override
public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
//TODO: do when event published successfully
}
@Override
public void onFailure(Throwable exception) {
exception.printStackTrace();
throw new ExampleException();
}
});
},
error -> {
error.printStackTrace();
throw new ExampleException();
}
);
acknowledgment.acknowledge(); // ??
} catch (ExampleException) {
exception.printStackTrace();
}
}