Я пытаюсь сделать следующее: когда я получаю сообщение от потребителя, я фильтрую это сообщение, основываясь на его содержимом, и фильтр отправит сообщения в 3 разные очереди с помощью трех разных производителей.
это конфигурация для трех очередей:
cloud:
stream:
bindings:
QuickScore:
concurrency: 5
destination: quickScore
content-type: application/json
group: quickScoreGroup
maxConcurrency: 10
recoveryInterval: 10000
SuitabilityScore:
concurrency: 5
destination: suitabilityScore
content-type: application/json
group: suitabilityScore
maxConcurrency: 10
recoveryInterval: 10000
CompletenessScore:
concurrency: 5
destination: completenessScore
content-type: application/json
group: completenessScore
maxConcurrency: 10
recoveryInterval: 10000
rabbitmq:
host: ${rabbitmq.host:localhost}
username: guest
password: guest
port: 5672
Пользовательский канал для очередей
public interface CustomChannels {
@Output("QuickScore")
MessageChannel publishMessageToQuickScore();
@Input("QuickScore")
SubscribableChannel receivedAtQuickScore();
@Output("CompletenessScore")
MessageChannel publishMessageToCompletenessScore();
@Input("CompletenessScore")
SubscribableChannel receivedAtCompletenessScore();
@Output("SuitabilityScore")
MessageChannel publishMessageToSuitabilityScore();
@Input("SuitabilityScore")
SubscribableChannel receivedAtSuitabilityScore();
}
производители для очередей:
@Autowired
private CustomChannels customChannels;
public void sendToQuickScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToQuickScore().send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to Quick score" + updatedFieldsEntity.toString());
}
public void sendToCompletenessScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToCompletenessScore()
.send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to completeness score" + updatedFieldsEntity.toString());
}
public void sendToSuitabilityScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToSuitabilityScore()
.send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to suitability score" + updatedFieldsEntity.toString());
}
}
И вот как я фильтрую и публикую в разных очередях:
@Autowired
private EventProducer producer;
public UpdatedFieldsEntity CheckUpdatedKey(UpdatedFieldsEntity updatedFieldsEntity)
throws RezoomexException {
logger.info("\n\n Checking UpdateKeys " + updatedFieldsEntity.toString());
if (updatedFieldsEntity == null) {
RezoomexException exception = new RezoomexException("update message is null");
throw exception;
}
for (UpdatedFields updatedFields : updatedFieldsEntity.getUpdatedFields()) {
UpdateKey element = updatedFields.getUpdateKey();
if (element.toString().equals(TECHNOLOGIES_UNDER_SKILLSET) || element.toString()
.equals(TOTAL_EXPERIENCE_VALUE)
|| element.toString().equals(TECHNOLOGIES) || element.toString()
.equals(TOTAL_EXPERIENCE_SUFFIX)) {
IsThreeScores = true;
}
}
if (IsThreeScores == true) {
logger.info("\n\n\n publishing message to all Q");
producer.sendToQuickScore(updatedFieldsEntity);
producer.sendToSuitabilityScore(updatedFieldsEntity);
producer.sendToCompletenessScore(updatedFieldsEntity);
IsThreeScores = false;
} else {
logger.info("\n\n\n publishing message to 2 Q");
producer.sendToSuitabilityScore(updatedFieldsEntity);
producer.sendToCompletenessScore(updatedFieldsEntity);
}
return updatedFieldsEntity;
}
}
В самый первый раз все очереди потребляют сообщение , но во второй раз любая из трех очередей выдает исключение как:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
... 43 common frames omitted