Как динамически выбрать канал ввода для метода слушателя в @StreamListener - PullRequest
0 голосов
/ 27 июня 2019

мы используем Spring Cloud Stream с привязкой Kakfa, и в нашем проекте мы предусмотрели следующие два класса для реализации базового потребителя:

MyConsumer ==> StreamConsumer

Где StreamConsumer:

    public class StreamConsumer {

        @StreamListener(EventSink.INPUT0)
        public void handleMessages(Message<Notification> message) {
            handle(message); // here we call to consumer registered functions
        }
    }
As of doc @StreamListener:

Annotation that marks a method to be a listener to inputs declared via 
EnableBinding (e.g. channels).

И MyConsumer:

    @EnableBinding(EventSink.class)
    public class MyConsumer extends StreamConsumer {

        --we declare functional interfaces (to consume messages) to be register at StreamConsumer
          and managed by the handle function in order to be call back--
    }
As of doc, @EnableBinding:

Enables the binding of targets annotated with Input and Output to a broker,
according to the list of interfaces passed as value to the annotation.

Интерфейс EventSink:

    public interface EventSink {

        String INPUT0 = "input0";
        String INPUT1 = "input1";
        String INPUT2 = "input2";

        @Input(INPUT0)
        SubscribableChannel input0();

        @Input(INPUT1)
        SubscribableChannel input1();

        @Input(INPUT2)
        SubscribableChannel input2();   
    }

Application.yml:

      cloud:
        stream:
          bindings:
            input0:
              group: CommonConsumer
              destination: ${EXTERNALSERVICES_QUEUE:externalservices}
              binder: kafka
            input1:
              group: UpdaterConsumer
              destination: ${EXTERNALSERVICES_QUEUE:externalservices}
              binder: kafka
            input2:
              group: DeleterConsumer
              destination: ${EXTERNALSERVICES_QUEUE:externalservices}
              binder: kafka
          binders:
              kafka:
                type: kafka
                environment:
                  spring:
                    kafka:
                      brokers: ${KAFKA_HOST:127.0.0.1}

AsВы можете видеть, что 3 входа имеют одно и то же назначение, потому что мы хотим иметь 3 (динамическое число из них) потребителей для совместного использования одного и того же канала, но мы не хотим, чтобы они конкурировали за сообщения и поэтому мы хотимиметь разные группы (все сообщения будут распространяться среди всех потребителей)

Есть ли способ объявить в базовом классе StreamConsumer метод для прослушивания сообщений с @StreamListener, но динамически выбирающий вход?Насколько я знаю, для @StreamListener или @Input всегда требуется литерал в качестве параметра (он может поступать из интерфейса EvenSink, прямого литерала, свойства приложения, SPEL для управления разделами)

Или другая возможность,можем ли мы динамически изменить группу внутри потребителя?

Я также проверил опцию для получения потребителя, как указано в документе:

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#pause-resume

, ноConsumer не имеет доступа к групповым свойствам.Я предполагаю, что в @StreamListener привязка уже установлена, и затем посредник выбрал определенную (возможно, неизменяемую) группу.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 27 июня 2019

Я думаю, что вам нужна маршрутизация на основе контента, где сообщение может быть делегировано определенному слушателю. Если это так, вы можете использовать атрибут condition для StreamListener. Подробнее ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...