Сообщения не отправлены в тему DLQ - PullRequest
0 голосов
/ 17 июня 2020

Я хочу использовать DLQ для исключений

Вот application.yml Topi c успешно создан, но я не получаю сообщение об исключении в моем DLQ topi c

spring:
  cloud:
    stream:
      default:
        consumer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers:
            - localhost:9092
          consumer-properties:
            key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
            specific.avro.reader: true
            enable.auto.commit: true
        bindings:
          resourceInventoryInput:
            consumer:
              autoCommitOffset: true
              autoCommitOnError: true
              enableDlq: true
              dlqName: dead-out
              dlqProducerProperties:
                configuration:
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
                  value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      bindings:
        resourceInventoryInput:
          binder: kafka
          destination: ${application.messaging.topic}
          content-type: application/*+avro
          group: ${application.messaging.group}
      default-binder: kafka

1 Ответ

0 голосов
/ 17 июня 2020

Не задавайте один и тот же вопрос в нескольких местах; это пустая трата вашего и нашего времени.

Я уже ответил вам на GitHub .

Я только что протестировал ваш yaml с помощью Boot 2.1.15 и Greenwich.SR6 (и Загрузите 2.2.8 / Hoxton.SR5), и он работает нормально. Единственные изменения, которые я внес, - это изменить имя привязки для ввода и комментирования материала avro.

@SpringBootApplication
@EnableBinding(Sink.class)
public class Kbgh9181Application {

    public static void main(String[] args) {
        SpringApplication.run(Kbgh9181Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        throw new RuntimeException("foo");
    }

    @KafkaListener(id = "kbgh918", topics = "dead-out", properties = "auto.offset.reset:earliest")
    public void listen(Message<?> in) {
        System.out.println(in);
    }

}

GenericMessage [payload = byte [3], headers = {x-original-offset = [B@67917b81, x-original-partition = [B@467895cd, kafka_timestampType = CREATE_TIME, kafka_receivedMessageKey = null, kafka_receivedTopic = dead-out, kafka_offset = 5, x-exception-message = [B@51def01e, x-exception-fqcn] = [B@531d42e5, kafka_consumer = org. apache .kafka.clients.consumer. KafkaConsumer@3fbc6674, x-original-topic = [B@3d684ab3, x-original-timestamp-type = [B@1b101300, kafka_receivedPartitionId = 0, x-original-timestamp = [B@222370ed, kafka_receivedTimestamp = 1592402977606, x-exception-stacktrace = [B@7e703d1b}]

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...