Я разрабатываю приложение PO C с SpringBoot и SpringClodStreamKafkaBinder для обмена сообщениями. Концепция basi c не очень сложна. Мне нужно иметь возможность получать сообщения от кафки, а также мне нужно создавать некоторые сообщения для кафки. Но эти два потока разделены. Один поток будет читать, а другой писать (не будет никакой ситуации, когда одному потоку нужно будет прочитать из kafka, выполнить что-то и затем записать в другой kafka topi c). Итак, мой код выглядит так:
spring:
kafka:
consumer:
properties:
isolation.level: read_committed
bootstrap-servers: localhost:9092
cloud.stream:
kafka:
############ KAFKA BINDER CONFIGURATION
binder:
configuration:
max.poll.records: 1
max.poll.interval.ms: 300000
min-partition-count: 2
brokers: localhost
auto-create-topics: true
auto-add-partitions: true
############ KAFKA BINDINGS CONFIGURATION
bindings:
SOME_INPUT:
consumer:
ack-each-record: true
start-offset: earliest
enable-dlq: true
dlq-name: SOME_INPUT.DLQ
dlqProducerProperties.configuration:
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
############ SPRING CLOUD STREAM BINDINGS CONFIGURATION
bindings:
SOME_INPUT:
group: SOME_INPUT_GROUP
destination: SOME_INPUT
content-type: application/json
consumer:
max-attempts: 2
back-off-initial-interval: 10000
back-off-max-interval: 20000
concurrency: ${spring.cloud.stream.kafka.binder.min-partition-count}
и в java
@RequiredArgsConstructor
class EventsListener {
@StreamListener(SOME_INPUT)
void handle(@Payload Event event, @Headers Map<String, Object> headers) {
...
}
}
interface EventsStreams {
@Input(SOME_INPUT)
SubscribableChannel someInput();
}
И он отлично работает. Я настроил 2 попытки для повторной доставки сообщения, и spring пытается обработать сообщение только два раза. После этого сообщение будет отправлено на номер SOME_INPUT.DLQ
topi c. Но я хочу иметь производителей @Transactional
, поэтому, согласно документации, мне нужно добавить в мою конфигурацию:
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix: kafka-tx
И после добавления такого свойства: потребитель больше не работает должным образом. Заголовок deliveryAttempt
теперь отсутствует в заголовках сообщений kafka, и если сообщение не обрабатывается должным образом (я имею в виду, что из метода handle
в EventsListener
выдается какой-то RuntimeException
), то сообщение повторно доставляется в бесконечном l oop.
Что я делаю не так?
Нужно ли мне писать настраиваемое управление DLQ на случай включения транзакций kafka?
В чем тогда смысл SpringCloudStream, если мне нужно делать все вручную?
Буду очень рад услышать от Вас любые объяснения.