Есть ли @KafkaListener в Reactor / Reactive Kafka? - PullRequest
1 голос
/ 09 апреля 2019

Я хочу написать Reactive Kafa для прослушивания нового приходящего сообщения.Но я не знаю, как это сделать.Как @KafkaListener в блокировании Kafka - он ждет нового сообщения

Демонстрация кода для Spring Boot Webflux и Reactor Kafka:

public class KafkaConsumer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "Kafka_Example";

    private final ReceiverOptions<String, String> receiverOptions;

    public KafkaConsumer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        receiverOptions = ReceiverOptions.create(props);

    }

    public Disposable consumeMessages(String topic) {

        ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions))
                .addRevokeListener(partitions -> System.out.println("onPartitionsRevoked " + partitions));

        Flux<ReceiverRecord<String, String>> kafkaFlux = KafkaReceiver.create(options).receive();
        return kafkaFlux.subscribe(record -> {
            ReceiverOffset offset = record.receiverOffset();
            System.out.printf("Received message: offset=%d key=%d value=%s\n", offset.offset(), record.key(), record.value());
            offset.acknowledge();
        });
    }

    public static void main(String[] args) throws Exception {
        KafkaConsumer consumer = new KafkaConsumer(BOOTSTRAP_SERVERS);
        consumer.consumeMessages(TOPIC);
    }
}

Он работает и останавливается.Хотелось бы, чтобы всегда ждали нового сообщения.

1 Ответ

0 голосов
/ 28 июня 2019
package com.simplest.kafkaconsumer.services;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

@Component
public class ReactorKafkaReceiver {

    private static final Logger log = LoggerFactory.getLogger(ReactorKafkaReceiver.class.getName());

    private KafkaReceiver kafkaReceiver;

    public ReactorKafkaReceiver() {


        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("test"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

        kafkaReceiver = KafkaReceiver.create(consumerOptions);

        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    System.out.println(r.value());
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }

}

Если вы хотите проверить: 1 - startzookeeper (настройки по умолчанию) 2 - запустить сервер kafka (настройки по умолчанию) 3 - создать тему с именем test 4 - создать простое сообщение 5 - создать простой проект в весеннем инициализатореи добавить зависимость реактор-кафка и, соответственно, соответственно добавить службу выше

Я подумал, должен ли я вставить в качестве ответа, потому что ваш вопрос в теме: есть ли @KafkaListener в Reactor / Reactive Kafka?но после прочтения дважды вашего вопроса кажется, что вы в основном сталкиваетесь с проблемой, чтобы продолжать слушать (он работает и останавливается. Я хотел бы, чтобы он всегда ждал нового сообщения.)

...