Привязать потребителя RabbitMQ с помощью Spring Cloud Stream к производителю RabbitMQ - PullRequest
0 голосов
/ 29 октября 2018

У меня есть два микросервиса, один для сбора XML-файлов с внутреннего FTP-сервера, преобразования их в объекты DTO и последующей публикации их в виде байтов в RabbitMQ, а другой для десериализации входящих байтов из RabbitMQ в объекты DTO, сопоставления их с сущностями JPA и сохраняя их в базе данных.

Я бы хотел настроить брокера RabbitMQ между этими двумя микросервисами, как показано ниже:

1) для микросервиса, который собирает XML-файлы, я отредактировал в application.properties, как показано ниже:

spring.cloud.stream.bindings.output.destination=TOPIC
spring.cloud.stream.bindings.output.group=proactive-policy

2) для микросервиса, в котором сохраняются входящие объекты DTO, я настроил в application.properties следующее:

spring.cloud.stream.bindings.input.destination=TOPIC
spring.cloud.stream.bindings.input.group=proactive-policy 

Для получения входящих байтов от RabbitMQ я использую второй микросервис в качестве приемника:

@EnableJpaAuditing
@EnableBinding(Sink.class)
@SpringBootApplication(scanBasePackages = { "org.proactive.policy.data.cache" })
@RefreshScope
public class ProactivePolicyDataCacheApplication {
    private static Logger logger = LoggerFactory.getLogger(ProactivePolicyDataCacheApplication.class);

    @Autowired
    PolicyService policyService;

    public static void main(String[] args) {
        SpringApplication.run(ProactivePolicyDataCacheApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void input(Message<byte[]> message) throws Exception {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            logger.error("the message is null ");
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        byte[] data = message.getPayload();
        if (data.length == 0) {
            logger.warn("Received empty message");
            return;
        }
        logger.info("Got data from policy-collector = " + new String(data, "UTF-8"));
        PolicyListDto policyListDto = (PolicyListDto) SerializationUtils.deserialize(data);
        logger.info("Policies.xml from policy-collector = " + policyListDto.getPolicy().toString());

        policyService.save(policyListDto);
    }

}

Но когда я открываю консоль RabbitMQ для просмотра бирж, я ничего не получаю в очереди TOPIC.proactive-policy Но входящие сообщения принимаются в другой очереди, которую я не настроил его с именем FTPSTREAM.proactive-policy-collector

Есть ли какие-либо предложения по решению этой проблемы

1 Ответ

0 голосов
/ 29 октября 2018

Пара баллов: 1. Нет такой вещи, как «группа» для выходной привязки. Consumer Group является потребительской собственностью. Вот фрагмент Javadocs.

/**
 * Unique name that the binding belongs to (applies to consumers only). Multiple
 * consumers within the same group share the subscription. A null or empty String
 * value indicates an anonymous group that is not shared.
 * @see org.springframework.cloud.stream.binder.Binder#bindConsumer(java.lang.String,
 * java.lang.String, java.lang.Object,
 * org.springframework.cloud.stream.binder.ConsumerProperties)
 */
private String group;

2. Имя 'FTPSTREAM.proactive-policy-collector' определенно не является чем-то, что генерируется весенним облачным потоком, поэтому подумайте о том, чтобы просмотреть свою конфигурацию и посмотреть, что вы пропустили.

Это говорит мне, что у вас есть какой-то потребитель, чье «назначение» имеет имя FTPSTREAM и его «группа» proactive-policy-collector . Также сообщается, что ваш производитель отправляет сообщения на обмен FTPSTREAM .

...