Добавление пользовательских заголовков в Spring Cloud Stream (с помощью Spring Reactor) - PullRequest
0 голосов
/ 18 февраля 2020

Будучи новичком в Spring Reactor, я пытаюсь передавать данные с помощью облачного потока Spring (используя rabbitMQ). Мне нужно добавить несколько пользовательских заголовков, прежде чем сообщение будет отправлено в очередь.

Моя конфигурация Spring-Cloud-Stream:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

Ссылка производителя:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

Заголовки успешно добавляются в Сообщение, но это где-то переопределяется и не распространяется на потребителя.

Может, кто-нибудь поделится своими мыслями о том, как мы можем добавлять пользовательские заголовки в Сообщение с помощью Spring Cloud Stream.

Заранее спасибо !

1 Ответ

0 голосов
/ 19 февраля 2020

Пожалуйста, обновитесь до Hoxton.SR2, который принесет spring-cloud-stream 3.0.2.RELEASE. Были некоторые обновления, но короче говоря, сообщение, которое вы создаете, и заголовок в нем должны быть сохранены.

Примечание: Кроме того, из-за добавленной поддержки нескольких аргументов функции входа / выхода нам пришлось обновить соглашение об именах привязки для функций. Вы можете прочитать больше об этом здесь , но это означает, что ваша конфигурация нуждается в быстром обновлении, так как input и output больше не используются по умолчанию, поэтому вы должны использовать имена, полученные из функции имя

spring:
  cloud:
    stream:
      bindings:
        processMessageaddHeaders-in-0:
          binder: rabbitInput
          destination: inputDestination
        processMessageaddHeaders-out-0:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

. , , или вы можете сопоставить производные имена привязок с чем-то более описательным (например, input, output et c) и использовать это имя вместо

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders
        bindings: 
          processMessageaddHeaders-in-0: input  
          processMessageaddHeaders-out-0: output


...