org.springframework.kafka.listener.ListenerExecutionFailedException: метод прослушивателя не может быть вызван с входящим сообщением - PullRequest
0 голосов
/ 21 сентября 2018

Я новичок в Apache Kafka и могу отправить сообщение (в формате JSON) от отправителя, но не могу использовать в службе поддержки клиентов.

Это мой код:

Служба отправителей

@Service
public class SenderService {
    private static final Logger LOG = LoggerFactory.getLogger(SenderService.class);

    @Autowired
    private KafkaTemplate<String, IdName> kafkaTemplate;

    @Value("${app.topic.email}")
    private String topic;

    public void send(IdName idName) {
        LOG.info("Sending Data='{}' to topic='{}' ", idName, topic);

        Message<IdName> message = MessageBuilder.withPayload(idName).setHeader(KafkaHeaders.TOPIC, topic)
            .setHeader(KafkaHeaders.MESSAGE_KEY, "TestMessage")
            .build();
        kafkaTemplate.send(message);
    }
}

###Consumer Service

@Service
public class ConsumerService {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);

    @Autowired
    MailSender mailSender;

    @KafkaListener(topics = "${app.topic.email}")
    public void receive(@Payload IdName data,
        @Header MessageHeaders headers) throws Exception{
        LOG.info("Received data='{}'", data);
    }
 }

Я получаю следующее исключение

2018-09-21 11:01:41.738 ERROR 63487 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = emailclient, partition = 0, offset = 4, CreateTime = 1537507901660, serialized key size = 11, serialized value size = 122, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 98, 99, 112, 108, 117, 115, 100, 46, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 46, 100, 111, 109, 97, 105, 110, 46, 73, 100, 78, 97, 109, 101])], isReadOnly = false), key = TestMessage, value = com.*****.****.domain.IdName@4a2d3006)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.*****.****.service.ConsumerService.receive(com.*****.****.domain.IdName,org.springframework.messaging.MessageHeaders) throws java.lang.Exception]

Bean [com.*****.****.service.ConsumerService@4649d70a]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'headers' for method parameter type [class org.springframework.messaging.MessageHeaders], failedMessage=GenericMessage [payload=com.*****.****.domain.IdName@4a2d3006, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@53fdf4bb, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=TestMessage, kafka_receivedPartitionId=0, kafka_receivedTopic=emailclient, kafka_receivedTimestamp=1537507901660, __TypeId__=[B@707343c2}]

Кто-нибудь может мне помочь?

1 Ответ

0 голосов
/ 21 сентября 2018

Если вы хотите иметь доступ ко всем заголовкам в слушателе, вы используете неправильную аннотацию, она должна быть @Headers вместо @Header.Полученный код выглядит следующим образом:

@Service
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);

    @Autowired
    MailSender mailSender;

    @KafkaListener(topics = "${app.topic.email}")
    public void receive(@Payload IdName data,
        @Headers MessageHeaders headers) throws Exception{
        LOG.info("Received data='{}'", data);
    }
 }

Конечно, вы также можете ввести ключ сообщения только так:

@Service
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);

    @Autowired
    MailSender mailSender;

    @KafkaListener(topics = "${app.topic.email}")
    public void receive(@Payload IdName data,
        @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
        LOG.info("Received data='{}'", data);
    }
 }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...