Потоковые сообщения Kafka в конечной точке REST - PullRequest
0 голосов
/ 20 февраля 2020

Я пытаюсь чего-то добиться с помощью REST-сервиса с весенней загрузкой и kafka. Я хочу динамически создать прослушиватель kafka topi c, когда я получаю запрос REST, прослушивать эти topi c в течение определенного периода времени, скажем, 10 секунд, отфильтровывать сообщения на основе некоторого значения в сообщении ( RecordFilterStrategy, я думаю?) И поток сообщений обратно к потребителю REST. Возможно ли это с Kafka?

Мне удалось динамически создать прослушиватель для topi c, но я не вижу, как добавить к нему фильтр записи или как я могу передать их обратно вызывающей стороне.

Вот так я динамически создавал слушателя для топи c:

public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
                consumerConfig);

        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(listener);


        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);
return container;
}

Может кто-нибудь указать мне какую-нибудь документацию о том, как этого добиться? Я создал отображение GET, которое может возвращать поток сообщений, но я не знаю, как вернуть сообщения в topi c. Я знаю, что приведенное ниже неверно, но код прослушивателя сообщений получен.

@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message> getMessages(@RequestParam String id) {

    MessageListener<String, String> listener = new MessageListener<String, String>() {

        @Override
        public void onMessage(ConsumerRecord<String, String> arg0) {
            logger.debug("We got a message:" + arg0);

        }
    };

    ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
    consumer.start();

  //return messages here

}

Ответы [ 2 ]

1 голос
/ 21 февраля 2020

Так как вы возвращаете Flux, возможно, будет хорошей идеей взглянуть на https://projectreactor.io/docs/kafka/release/reference/

и использовать реактивный приемник, который возвращает поток, который вы можете отобразить

Flux<ReceiverRecord<Integer, String>> inboundFlux =
Receiver.create(receiverOptions)
        .receive();

Или, глядя на свой код, вы можете просто вытащить их все, отобразить и построить поток, но это будет блокировка.

0 голосов
/ 20 февраля 2020

При использовании собственной реализации MessageListener вы должны выполнить фильтрацию самостоятельно. RecordFilterStrategy используется FilteringMessageListenerAdapter при использовании созданных Spring слушателей для методов @RabbitListener.

...