, поэтому мне нужно создать инструмент метаданных для нашей шины Kafka, и я сделал это с помощью обычного Kafka, и они сказали мне, чтобы повторить его с помощью Spring Boot Streams.
У меня возникли проблемы с воссозданием, и я получаю последние n сообщений.
Я не знаю, как выполнить эту задачу, поскольку topi c к хвосту может быть любая topi c в кластере Kafka, и она не известна до вызова API REST.
До сих пор я пытался использовать аннотации @StreamListener и @Input, но они быть решенным во время компиляции.
Также я пробовал следующее:
public static SubscribableChannel createChannel(String topicName) {
DirectChannel directChannel = new DirectChannel();
directChannel.setBeanName(topicName);
beanFactory.registerSingleton(topicName, directChannel);
return (SubscribableChannel) beanFactory.initializeBean(directChannel, topicName);
}
public static SubscribableChannel getChannel(String topic){
return beanFactory.getBean(topic, SubscribableChannel.class);
}
Но я не знаю, является ли это правильным способом создания SubscribeableChannel. Также я не знаю, как подключить ie мой SubscribeableChannel к topi c Kafka без использования аннотации @Input. Также я не думаю, что метод подписки () может быть использован для того, что я пытаюсь сделать.
Что мне нужно, чтобы добиться успеха, это KafkaConsumer (или эквивалентный Spring Streams-потребитель), привязанный к динамически создаваемому объект (такой как SubscribeableChannel), который принимает имя topi c в качестве параметра и связывает часть обмена сообщениями Spring Boot Streams с topi c Kafka. Имеет ли это смысл?
Также у меня есть предположение, что Spring Boot Streams используется для бесконечной потоковой передачи данных из одного места в другое и что здесь запрашивается (получение последних n сообщений от topi c в данный момент времени) не подпадает под то, что рамки должны делать. Это будет правильно?
Заранее спасибо.