Spring Cloud Stream IntegrationFlow с обменом сообщениями Rabbitmq. Потребитель предоставляет номера ASCII в качестве полезной нагрузки сообщения. - PullRequest
0 голосов
/ 29 апреля 2018

Я использую Spring Cloud Stream для обмена сообщениями. В потребительской части я использовал IntegrationFlow для прослушивания очереди. Он слушает и печатает сообщение со стороны производителя. Но формат другой, с этой проблемой я сталкиваюсь сейчас. Тип содержимого источника: application / json и полезная нагрузка сообщения IntegrationFLow, показывающая номера ASCII. Код написан для потребителя приведен ниже

 @EnableBinding(UserOperationConsume.class)
 public class ConsumerController {

   @Bean
   IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
     return IntegrationFlows
     .from(u.userRegistraionProduces())
     //.transform(Transformers.toJson()) // not working as expected
     //.transform(Transformers.fromJson(UserDTO.class))
     .handle(String.class, (payload, headers) -> {
     System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118... 
     return null;
     }).get();
  }

 }

Интерфейс ввода:

 public interface UserOperationConsume {
  @Input
  public SubscribableChannel userRegistraionProduces();
 }

И потребительская конфигурация yml,

 server:
   port: 8181

 spring:
   application:
   name: nets-alert-service
 ---
 spring:
   cloud:
     config:
       name: notification-service
       uri: http://localhost:8888
 ---    
 spring:
   rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest

   ---
   spring:
     cloud:
       stream:
         bindings:
           userRegistraionProduces:
             destination: userOperations
         input:
           content-type: application/json

Я пробовал привязку Sink.class. Тогда я получил точное сообщение из очереди. Поэтому, пожалуйста, дайте мне знать, если есть какая-либо ошибка в этой конфигурации IntegrationFlow. Потому что я новичок в весеннем облачном потоке и IntegrationFlow. Есть ли способ преобразовать эту ASCII в точную строку? Заранее спасибо

1 Ответ

0 голосов
/ 29 апреля 2018

Использование IntegrationFlows.from(channel) не дает подсказок по конверсии, поэтому вы просто получаете необработанную byte[] полезную нагрузку (содержащую JSON). Непонятно, почему вы тогда используете toJson() трансформатор.

Ваш .handle(String.class, (payload, headers) -> {... вызывает использование простого ArrayToStringConverter, поэтому вы видите каждое значение байта.

В любом случае вы неправильно используете фреймворк. Используйте ...

@StreamListener("userRegistraionProduces")
public void listen(UserDTO dto) {
    System.out.println(dto);
}

... и фреймворк позаботится о преобразовании за вас. Или ...

@StreamListener("userRegistraionProduces")
public void listen(Message<UserDTO> dtoMessage) {
    System.out.println(dtoMessage);
}

, если ваш производитель передает дополнительную информацию в заголовках.

EDIT

Если вы предпочитаете делать преобразование самостоятельно, это прекрасно работает ...

@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
    return IntegrationFlows.from(u.userRegistraionProduces())
            .transform(Transformers.fromJson(UserDTO.class))
            .handle((payload, headers) -> {
                System.out.println(payload.toString());
                return null;
            }).get();
}

... поскольку преобразователь Json может читать byte[].

...