Я установил три очереди rabbitmq с весенним облаком, одна из очередей случайно выходит из строя - PullRequest
0 голосов
/ 23 ноября 2018

Я пытаюсь сделать следующее: когда я получаю сообщение от потребителя, я фильтрую это сообщение, основываясь на его содержимом, и фильтр отправит сообщения в 3 разные очереди с помощью трех разных производителей.

это конфигурация для трех очередей:

  cloud:
    stream:
      bindings:
        QuickScore:
          concurrency: 5
          destination: quickScore
          content-type: application/json
          group: quickScoreGroup
          maxConcurrency: 10
          recoveryInterval: 10000
        SuitabilityScore:
          concurrency: 5
          destination: suitabilityScore
          content-type: application/json
          group: suitabilityScore
          maxConcurrency: 10
          recoveryInterval: 10000
        CompletenessScore:
          concurrency: 5
          destination: completenessScore
          content-type: application/json
          group: completenessScore
          maxConcurrency: 10
          recoveryInterval: 10000

  rabbitmq:
    host: ${rabbitmq.host:localhost}
    username: guest
    password: guest
    port: 5672

Пользовательский канал для очередей

public interface CustomChannels {

  @Output("QuickScore")
  MessageChannel publishMessageToQuickScore();

  @Input("QuickScore")
  SubscribableChannel receivedAtQuickScore();

  @Output("CompletenessScore")
  MessageChannel publishMessageToCompletenessScore();

  @Input("CompletenessScore")
  SubscribableChannel receivedAtCompletenessScore();

  @Output("SuitabilityScore")
  MessageChannel publishMessageToSuitabilityScore();

  @Input("SuitabilityScore")
  SubscribableChannel receivedAtSuitabilityScore();
}

производители для очередей:

@Autowired
  private CustomChannels customChannels;

  public void sendToQuickScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToQuickScore().send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to Quick score" + updatedFieldsEntity.toString());
  }

  public void sendToCompletenessScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToCompletenessScore()
        .send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to completeness score" + updatedFieldsEntity.toString());
  }

  public void sendToSuitabilityScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToSuitabilityScore()
        .send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to suitability score" + updatedFieldsEntity.toString());
  }
}

И вот как я фильтрую и публикую в разных очередях:

 @Autowired
  private EventProducer producer;

  public UpdatedFieldsEntity CheckUpdatedKey(UpdatedFieldsEntity updatedFieldsEntity)
      throws RezoomexException {
    logger.info("\n\n Checking UpdateKeys " + updatedFieldsEntity.toString());
    if (updatedFieldsEntity == null) {
      RezoomexException exception = new RezoomexException("update message is null");
      throw exception;
    }
    for (UpdatedFields updatedFields : updatedFieldsEntity.getUpdatedFields()) {
      UpdateKey element = updatedFields.getUpdateKey();
      if (element.toString().equals(TECHNOLOGIES_UNDER_SKILLSET) || element.toString()
          .equals(TOTAL_EXPERIENCE_VALUE)
          || element.toString().equals(TECHNOLOGIES) || element.toString()
          .equals(TOTAL_EXPERIENCE_SUFFIX)) {
        IsThreeScores = true;
      }

    }
    if (IsThreeScores == true) {
      logger.info("\n\n\n publishing message to all Q");
      producer.sendToQuickScore(updatedFieldsEntity);
      producer.sendToSuitabilityScore(updatedFieldsEntity);
      producer.sendToCompletenessScore(updatedFieldsEntity);
      IsThreeScores = false;
    } else {
      logger.info("\n\n\n publishing message to 2 Q");
      producer.sendToSuitabilityScore(updatedFieldsEntity);
      producer.sendToCompletenessScore(updatedFieldsEntity);
    }
    return updatedFieldsEntity;
  }
}

В самый первый раз все очереди потребляют сообщение , но во второй раз любая из трех очередей выдает исключение как:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    ... 43 common frames omitted

1 Ответ

0 голосов
/ 30 ноября 2018

Проблема в том, что вы используете один и тот же канал для ввода и вывода (т. Е. Используете сообщение и размещаете сообщение в очереди из одного и того же канала).Имеется другой канал для потребления, например: -

  @Output("QuickScore")
  MessageChannel publishMessageToQuickScore();

  @Input("Score")
  SubscribableChannel receivedAtQuickScore();

В вашем коде измените имя канала на @input или @ output.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...