Невозможно прочитать заголовок исключения из весеннего облачного потока DLQ kafka - PullRequest
1 голос
/ 26 января 2020

Использование Spring Cloud Stream Kafka Listener для чтения сообщения из kafka topi c и в случае исключения отправка его в очередь недоставленных сообщений с настройкой свойств

spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=book_error

Я могу отправить сообщение в DLQ. Однако, когда я пытаюсь читать из DLQ, чтобы я мог предпринять действия, основанные на сообщении об ошибке. Я не могу прочитать исключение, встроенное в шапку.

@StreamListener("dlqChannel")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: READING FROM DLQ");
    logger.info("header :" +message.getHeaders());
    logger.info("payload : " +message.getPayload());
    //return message;
}

Кажется, что полезная нагрузка заголовка имеет идентификатор объекта, который я не могу расшифровать. Как мне разобрать ошибку и обработать на основе сообщения об исключении. Ниже приведен заголовок, если я попытаюсь напечатать его

header :{x-original-offset=[B@f82eb25, x-original-partition=[B@7a3b83c, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=book_errors, kafka_offset=0, x-exception-message=[B@6dcc9872, x-exception-fqcn=[B@68079694, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@70449372, x-original-topic=[B@6a3ca71e, x-original-timestamp-type=[B@63baad23, kafka_receivedPartitionId=0, contentType=application/json, x-original-timestamp=[B@37dd34f6, kafka_receivedTimestamp=1579990310188, kafka_groupId=bkerrgrp, x-exception-stacktrace=[B@6356ee7c}

1 Ответ

0 голосов
/ 26 января 2020

Заголовки исключений: byte[] - это единственный тип, который поддерживает Kafka. Различная строковая информация сохраняется как String.getBytes(StandardCharsets.UTF_8)).

Использование

String exceptionMessage = new String(message.getHeaders().get("x-exception-message", byte[].class), StandardCharsets.UTF_8);

Числовые значения c хранятся с использованием соответствующих типов (int, long).

Используйте ByteBuffer.wrap(header).getInt() et c для тех.

Вот код, который хранит заголовки ...

kafkaHeaders.add(
        new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_PARTITION,
        ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_OFFSET,
        ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP,
        ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE,
        record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN,
        throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
        throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
        getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...