Spring kafka: не удалось отправить сообщение на канал полезной нагрузки не должно быть нулевым - PullRequest
0 голосов
/ 29 июня 2018

У меня есть два микросервиса проекта с пружинной загрузкой 1.5.9.RELEASE, первый микросервис отправляет пользователя в тему, а второй микросервис использует это сообщение, когда я отправляю пользователю, во втором микросервисе отображается ошибка ниже

Трассировка стека:

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'verification-token-in'; nested exception is java.lang.IllegalArgumentException: payload must not be null
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:54) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:288) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:279) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:79) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.1.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_112]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_112]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_112]
Caused by: java.lang.IllegalArgumentException: payload must not be null
    at org.springframework.util.Assert.notNull(Assert.java:134) ~[spring-core-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:265) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:540) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:417) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
    ... 31 common frames omitted

Кафка зависимость:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Выход (Первый Микросервис):

spring:
  cloud:
    stream:
      bindings:
        verification-token-out:
          destination: verification.token
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181

public interface UserStreams {

    String OUTPUT_VERIFICATION_TOKEN = "verification-token-out";

    @Output(UserStreams.OUTPUT_VERIFICATION_TOKEN)
    public MessageChannel outputVerificationToken();

}

@EnableBinding(UserStreams.class)
public class UserServiceImpl {

    @Autowired
    private UserStreams source;

    @Override
    public void createVerificationTokenForUser(UserDto userSender) {
     source.outputVerificationToken().send(MessageBuilder.withPayload(userSender).build());
    }

}

Вход (второй микросервис):

spring:
  cloud:
    stream:
      bindings:
        verification-token-in:
          destination: verification.token
          contentType: application/octet-stream
          originalContentType: application/json
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181

public interface MailStreams {

    String INPUT_VERIFICATION_TOKEN = "verification-token-in";

    @Input(MailStreams.INPUT_VERIFICATION_TOKEN)
    SubscribableChannel inputVerificationToekn();

}

@EnableBinding(MailStreams.class)
public class Receiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    @StreamListener(MailStreams.INPUT_VERIFICATION_TOKEN)
    public void verificationToken(final UserDto payload) {
        LOGGER.info("Received payload='{}'", payload);
    }

}

1 Ответ

0 голосов
/ 29 июня 2018

[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]

Проблема была связана с неправильным типом контента; это исправлено в 1.3.x. (Ditmars).

...