Я пытаюсь настроить один раз семантику в Kafka (Apache Beam).Вот изменения, которые я собираюсь представить:
Производитель:
enable.idenpotence
= true
transactional.id
= uniqueTransactionalId
Потребитель:
set enable.auto.commit
= false
// добавлено следующее для потребителя:
.commitOffsetsInFinalize()
.withReadCommitted()
Добавлено следующее к KafkaIO#write
builder:
.withEOS(numShards, sinkGroupId)
Кто-нибудь знает, что еще нужно изменить, чтобы достичь ровно семантики в Apache Beam KafkaIO?
Вышеприведенная конфигурация выглядит хорошо, или я что-то неправильно понял?
Нужно ли указывать свойство transactional.id
, если я не использую API транзакций (потому что у меня нет явного производителя в apache beam)?