Я пытаюсь чего-то добиться с помощью REST-сервиса с весенней загрузкой и kafka. Я хочу динамически создать прослушиватель kafka topi c, когда я получаю запрос REST, прослушивать эти topi c в течение определенного периода времени, скажем, 10 секунд, отфильтровывать сообщения на основе некоторого значения в сообщении ( RecordFilterStrategy, я думаю?) И поток сообщений обратно к потребителю REST. Возможно ли это с Kafka?
Мне удалось динамически создать прослушиватель для topi c, но я не вижу, как добавить к нему фильтр записи или как я могу передать их обратно вызывающей стороне.
Вот так я динамически создавал слушателя для топи c:
public ConcurrentMessageListenerContainer getConsumer(String topic, MessageListener<String, String> listener) {
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfig);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(listener);
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
return container;
}
Может кто-нибудь указать мне какую-нибудь документацию о том, как этого добиться? Я создал отображение GET, которое может возвращать поток сообщений, но я не знаю, как вернуть сообщения в topi c. Я знаю, что приведенное ниже неверно, но код прослушивателя сообщений получен.
@GetMapping(path = "/v1/messages", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> getMessages(@RequestParam String id) {
MessageListener<String, String> listener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> arg0) {
logger.debug("We got a message:" + arg0);
}
};
ConcurrentMessageListenerContainer consumer = consumerFactory.getConsumer("mytopic", listener);
consumer.start();
//return messages here
}