мы используем Spring Cloud Stream с привязкой Kakfa, и в нашем проекте мы предусмотрели следующие два класса для реализации базового потребителя:
MyConsumer ==> StreamConsumer
Где StreamConsumer:
public class StreamConsumer {
@StreamListener(EventSink.INPUT0)
public void handleMessages(Message<Notification> message) {
handle(message); // here we call to consumer registered functions
}
}
As of doc @StreamListener:
Annotation that marks a method to be a listener to inputs declared via
EnableBinding (e.g. channels).
И MyConsumer:
@EnableBinding(EventSink.class)
public class MyConsumer extends StreamConsumer {
--we declare functional interfaces (to consume messages) to be register at StreamConsumer
and managed by the handle function in order to be call back--
}
As of doc, @EnableBinding:
Enables the binding of targets annotated with Input and Output to a broker,
according to the list of interfaces passed as value to the annotation.
Интерфейс EventSink:
public interface EventSink {
String INPUT0 = "input0";
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT0)
SubscribableChannel input0();
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
Application.yml:
cloud:
stream:
bindings:
input0:
group: CommonConsumer
destination: ${EXTERNALSERVICES_QUEUE:externalservices}
binder: kafka
input1:
group: UpdaterConsumer
destination: ${EXTERNALSERVICES_QUEUE:externalservices}
binder: kafka
input2:
group: DeleterConsumer
destination: ${EXTERNALSERVICES_QUEUE:externalservices}
binder: kafka
binders:
kafka:
type: kafka
environment:
spring:
kafka:
brokers: ${KAFKA_HOST:127.0.0.1}
AsВы можете видеть, что 3 входа имеют одно и то же назначение, потому что мы хотим иметь 3 (динамическое число из них) потребителей для совместного использования одного и того же канала, но мы не хотим, чтобы они конкурировали за сообщения и поэтому мы хотимиметь разные группы (все сообщения будут распространяться среди всех потребителей)
Есть ли способ объявить в базовом классе StreamConsumer метод для прослушивания сообщений с @StreamListener, но динамически выбирающий вход?Насколько я знаю, для @StreamListener или @Input всегда требуется литерал в качестве параметра (он может поступать из интерфейса EvenSink, прямого литерала, свойства приложения, SPEL для управления разделами)
Или другая возможность,можем ли мы динамически изменить группу внутри потребителя?
Я также проверил опцию для получения потребителя, как указано в документе:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#pause-resume
, ноConsumer
не имеет доступа к групповым свойствам.Я предполагаю, что в @StreamListener привязка уже установлена, и затем посредник выбрал определенную (возможно, неизменяемую) группу.
Любая помощь приветствуется.