Тема Spring Cloud Stream на сообщение для разных потребителей - PullRequest
0 голосов
/ 05 мая 2018

Топология, которую я ищу,

enter image description here

До сих пор я не видел способа определения темы для каждого сообщения в Cloud Stream. Я понимаю, что потребители будут привязаны к определенной теме, но как производитель устанавливает тему для сообщения перед отправкой сообщения на биржу?

source.output().send(MessageBuilder.withPayload(myMessage).build());

Не предоставляет какого-либо способа задать тему для обмена маршрутом к нужному потребителю.

А может я что-то не правильно понимаю?

UPDATE

Я бы не ожидал, что получатель получит сообщение из-за того, что bindingRoutingKey составляет 2222, и я отправляю с routeTo 1111. Но я все еще получаю это на потребителя.

Свойства производителя:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']


@EnableBinding(Source.class)
@SpringBootApplication
public class Application {

   public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

}

Отправитель:

source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());

А Потребитель:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222

Применение:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
}

@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
    log.info("Message received moDTO: {}", moDTO);
}

}

ВТОРОЕ ОБНОВЛЕНИЕ

С предложениями в принятом ответе ниже. Я был в состоянии заставить это работать. Требуется удалить обмены и очереди из RabbitMQ с помощью его пользовательского интерфейса и перезапустить образ докера RabbitMQ.

1 Ответ

0 голосов
/ 06 мая 2018

Собственность производителя routingKeyExpression rabbitmq .

например. ...producer.routing-key-expression=headers['routeTo']

тогда

source.output().send(MessageBuilder.withPayload(myMessage)
    .setHeader("routeTo", "Booking.new")
    .build());

Обратите внимание, что местом назначения является обменное имя. По умолчанию связыватель ожидает обмен Topic. Если вы хотите использовать прямой обмен вместо этого, вы должны установить свойство exchangeType.

...