Этот вопрос похож на topi Spring Cloud Stream c за сообщение для разных потребителей , но разница в том, что мне нужно несколько Sinks в одном приложении Springboot для потребителей, и я хочу сделать это с помощью rabbitmq topi c (что по умолчанию в весеннем облачном потоке). Я не могу определить правильную конфигурацию или неправильный код в коде. У меня есть 3 раковины / cosumers. Потребитель1 по умолчанию, и каждое сообщение отправляется туда.
** Обновлено в соответствии с предложением Гарри **
Комментарий: в моем приложении для продюсера есть ключ маршрутизации = '*. events' application.yml
spring:
cloud:
stream:
bindings:
output:
destination: my-exchange
rabbit:
bindings:
output:
producer:
routing-key-expression: headers['*.events']
application:
name: publisher-service
server:
port: 15010
Фрагмент кода производителя Комментарий: сообщение отправлено с ключом маршрутизации = "test.events". Я не уверен во втором аргументе, но я предполагаю, что это - bindingrouting-key = test1.events.billing, что означает, что я хочу, чтобы он был доставлен потребителю биллинга помимо потребителя по умолчанию.
source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
.setHeader("*.events", "test1.events.billing")
.build());
Конфигурация потребителя Комментарий: Я хочу назначить 3 очереди для exchange = "myexchange". Я не уверен, если конфиг правильный. application.yml
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
application:
name: subscriber-service
server:
port: 15020
Код потребителя: IEventConsumer. java Комментарий: я не уверен, что приведенный ниже код верен
public interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
EventConsumer. java Комментарий: все со стороны ниже это сообщение не должно быть получено моим messsageConsumer! Но на самом деле это происходит с помощью всех этих методов.
@StreamListener("defaultconsumer")
public void subscribe1(EventMessage eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(EventMessage eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(EventMessage eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
Видимо, что-то не так наверху, и я не вижу этой работы. Есть идеи?