Spring Cloud Stream Kafka Binder - Включение транзакций и обработка DLQ - PullRequest
0 голосов
/ 06 мая 2020

Я разрабатываю приложение PO C с SpringBoot и SpringClodStreamKafkaBinder для обмена сообщениями. Концепция basi c не очень сложна. Мне нужно иметь возможность получать сообщения от кафки, а также мне нужно создавать некоторые сообщения для кафки. Но эти два потока разделены. Один поток будет читать, а другой писать (не будет никакой ситуации, когда одному потоку нужно будет прочитать из kafka, выполнить что-то и затем записать в другой kafka topi c). Итак, мой код выглядит так:

spring:
  kafka:
    consumer:
      properties:
        isolation.level: read_committed
    bootstrap-servers: localhost:9092
  cloud.stream:
    kafka:

      ############ KAFKA BINDER CONFIGURATION
      binder:
        configuration:
          max.poll.records: 1 
          max.poll.interval.ms: 300000 
        min-partition-count: 2 
        brokers: localhost
        auto-create-topics: true
        auto-add-partitions: true

      ############ KAFKA BINDINGS CONFIGURATION
      bindings:
        SOME_INPUT:
          consumer:
            ack-each-record: true 
            start-offset: earliest 
            enable-dlq: true
            dlq-name: SOME_INPUT.DLQ 
            dlqProducerProperties.configuration:
              key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
              value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer 

    ############ SPRING CLOUD STREAM BINDINGS CONFIGURATION
    bindings:
      SOME_INPUT:
        group: SOME_INPUT_GROUP
        destination: SOME_INPUT
        content-type: application/json
        consumer:
          max-attempts: 2 
          back-off-initial-interval: 10000 
          back-off-max-interval: 20000
          concurrency: ${spring.cloud.stream.kafka.binder.min-partition-count}

и в java

@RequiredArgsConstructor
class EventsListener {

    @StreamListener(SOME_INPUT)
    void handle(@Payload Event event, @Headers Map<String, Object> headers) {
      ...
    }
}


interface EventsStreams {

    @Input(SOME_INPUT)
    SubscribableChannel someInput();

}


И он отлично работает. Я настроил 2 попытки для повторной доставки сообщения, и spring пытается обработать сообщение только два раза. После этого сообщение будет отправлено на номер SOME_INPUT.DLQ topi c. Но я хочу иметь производителей @Transactional, поэтому, согласно документации, мне нужно добавить в мою конфигурацию:

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix: kafka-tx

И после добавления такого свойства: потребитель больше не работает должным образом. Заголовок deliveryAttempt теперь отсутствует в заголовках сообщений kafka, и если сообщение не обрабатывается должным образом (я имею в виду, что из метода handle в EventsListener выдается какой-то RuntimeException), то сообщение повторно доставляется в бесконечном l oop.

Что я делаю не так?

Нужно ли мне писать настраиваемое управление DLQ на случай включения транзакций kafka?

В чем тогда смысл SpringCloudStream, если мне нужно делать все вручную?

Буду очень рад услышать от Вас любые объяснения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...