У меня есть такой сценарий, который преследует меня: тема Кафки с сообщениями, которые я должен потреблять и открывать через SSE для веб-компонента. Я выполнил несколько предварительных предположений на всех уровнях в поисках стабильного и более надежного подхода или, по крайней мере, некоторых, которые мне удобнее поддерживать.
Теперь я создаю очень простую тему Кафки и создаю двух разных потребителей, и оба, кажется, работают примерно одинаково. Один из них использует org.springframework.kafka.annotation.KafkaListener, а другой реактор.kafka.receiver.KafkaReceiver.
Конечная цель - выставить через Spring WebFlux событие, отправляющее сообщение, когда получатель из темы.
Если я не ошибаюсь, я где-то читал, что Spring..KafkaListener блокирует код, но, насколько я вижу, это не так. Просто слушатель был запущен точно как Reactor..KafkaReceiver. Поскольку я кодирую неблокирующий код, я должен избегать блокирования кода, но я не могу блокировать Spring..KafkaListener в любом месте.
Вот базовое сравнение, результатом которого является точно такой же результат:
Реактор KafkaReceiver:
ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("test"))
.addAssignListener(partitions -> logger.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> logger.debug("onPartitionsRevoked {}", partitions));
kafkaReceiver = KafkaReceiver.create(consumerOptions);
((Flux<ReceiverRecord>) kafkaReceiver.receive()).doOnNext(r -> {
logger.info(String.format("Consumed Message using KafkaListener -> %s", r.value()));
r.receiverOffset().acknowledge();
}).subscribe();
Spring Kafka Listener:
@KafkaListener(topics = "test")
public void consume(String message) {
logger.info(String.format("Consumed Message using KafkaListener -> %s", message));
}
Если вы хотите воспроизвести сравнение:
1 - clone https://github.com/jimisdrpc/simplest-comparison-kafkaconsumer
2 - create default Kafka topic name test
3 - produce any message to such topic
4 - enable either Reactor Kafka and run again after enable Kafka Listener (I didn't find how make both work in same application at same moment but let's say it isn't that important now)
PS: я не спрашиваю, что лучше. Я пытаюсь сравнить для очень специфического сценария (реактивное программирование, направленное на создание событий для веб-компонента через Spring WebFlux) и пытаюсь гарантировать, что я не выбираю тот, который может каким-то образом блокировать код.