Spring Cloud Stream topi c за сообщение для разных потребителей (в одном приложении Comsumer) - PullRequest
0 голосов
/ 27 марта 2020

Этот вопрос похож на topi Spring Cloud Stream c за сообщение для разных потребителей , но разница в том, что мне нужно несколько Sinks в одном приложении Springboot для потребителей, и я хочу сделать это с помощью rabbitmq topi c (что по умолчанию в весеннем облачном потоке). Я не могу определить правильную конфигурацию или неправильный код в коде. У меня есть 3 раковины / cosumers. Потребитель1 по умолчанию, и каждое сообщение отправляется туда.

** Обновлено в соответствии с предложением Гарри **

Комментарий: в моем приложении для продюсера есть ключ маршрутизации = '*. events' application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-exchange
      rabbit:
        bindings:
          output:
            producer:
              routing-key-expression: headers['*.events']
  application:
    name: publisher-service
server:
  port: 15010

Фрагмент кода производителя Комментарий: сообщение отправлено с ключом маршрутизации = "test.events". Я не уверен во втором аргументе, но я предполагаю, что это - bindingrouting-key = test1.events.billing, что означает, что я хочу, чтобы он был доставлен потребителю биллинга помимо потребителя по умолчанию.

 source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
                    .setHeader("*.events", "test1.events.billing")
                    .build());

Конфигурация потребителя Комментарий: Я хочу назначить 3 очереди для exchange = "myexchange". Я не уверен, если конфиг правильный. application.yml

spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages

  application:
    name: subscriber-service
server:
  port: 15020

Код потребителя: IEventConsumer. java Комментарий: я не уверен, что приведенный ниже код верен

public interface IEventConsumer {
     String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();
}

EventConsumer. java Комментарий: все со стороны ниже это сообщение не должно быть получено моим messsageConsumer! Но на самом деле это происходит с помощью всех этих методов.



    @StreamListener("defaultconsumer")
    public void subscribe1(EventMessage eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }


   @StreamListener("billingconsumer")
    public void subscribe2(EventMessage eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(EventMessage eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

Видимо, что-то не так наверху, и я не вижу этой работы. Есть идеи?

1 Ответ

2 голосов
/ 27 марта 2020
    @Input(INPUT)
    SubscribableChannel defaultconsumer();

    @Input(INPUT)
    SubscribableChannel billingconsumer();

    @Input(INPUT)
    SubscribableChannel messageconsumer();

Вы присваиваете всем трем привязкам одинаковое имя; просто используйте @INPUT, и имя метода будет использоваться в качестве имени привязки.

И

@StreamListener("defaultconsumer")

et c.

РЕДАКТИРОВАТЬ

Я просто скопировал ваш код, и он работал нормально ...

@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {

    private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So60879187Application.class, args);
    }

    @StreamListener("defaultconsumer")
    public void subscribe1(String eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("billingconsumer")
    public void subscribe2(String eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(String eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> output.send(MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build());
    }

}

interface IEventConsumer {
    String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();

}
spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3
          output:
            destination: my-exchange

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages
            output:
              producer:
                routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

и

2020-03-27 09:45:33.607  INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application   
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 09:45:33.607  INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application   
  :  billingEventConsumer received new event [foo] 

enter image description here

EDIT2

Более новая функциональная модель программирования, эквивалентная ...

@SpringBootApplication
public class So608791871Application {

    private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So608791871Application.class, args);
    }

    @Bean
    public Consumer<String> defaultconsumer() {
        return eventMessage ->
                logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> billingconsumer() {
        return eventMessage ->
                logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> messageconsumer() {
        return eventMessage ->
                logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    private final DirectProcessor<Message<?>> output = DirectProcessor.create();

    @Bean
    public Supplier<Flux<Message<?>>> output() {
        return () -> this.output;
    }

    @Bean
    public ApplicationRunner runner() {
        Message<String> msg1 = MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build();
        Message<String> msg2 = MessageBuilder.withPayload("bar")
                .setHeader("*.events", "test2.events.messages")
                .build();
        return args -> {
            this.output.onNext(msg1);
            this.output.onNext(msg2);
        };
    }

}
spring:
  cloud:
    function:
      definition: defaultconsumer;billingconsumer;messageconsumer;output
    stream:
      bindings:
        defaultconsumer-in-0:
          destination: my-exchange
          group: queue1
        billingconsumer-in-0:
          destination: my-exchange
          group: queue2
        messageconsumer-in-0:
          destination: my-exchange
          group: queue3
        output-out-0:
          destination: my-exchange

      rabbit:
        bindings:
          defaultconsumer-in-0:
            consumer:
              bindingRoutingKey: '*.events.#'
          billingconsumer-in-0:
            consumer:
              bindingRoutingKey: test1.events.billing
          messageconsumer-in-0:
            consumer:
              bindingRoutingKey: test2.events.messages
          output-out-0:
            producer:
              routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

и

2020-03-27 14:28:37.426  INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
  :  messageEventConsumer received new event [bar] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
  :  billingEventConsumer received new event [foo] 
2020-03-27 14:28:37.429  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [bar] 
...