Вы спрашиваете, как найти, какие записи дублируются в Кафке, и мой ответ в первую очередь направлен на предотвращение дублирования. Насколько я знаю, выявление дубликатов в Topi c может быть выполнено только с помощью специального приложения.
Что вам нужно, чтобы избежать дубликатов, так это семантика "точно один раз" для Kafka. Об этом есть множество хороших блогов в Интернете.
Основная идея заключается в том, чтобы включить продюсер безграничного влияния, установив enable.idempotence=true
. Чтобы гарантировать, что только один запрос может быть отправлен брокеру за раз, вы можете установить max.in.flight.requests.per.connection=1
. С acks
вы можете настроить долговечность отправляемых сообщений. Для семантики "точно один раз" она должна быть установлена на acks=all
.
Кроме того, теперь Kafka поддерживает запись atomi c через несколько разделов через новый API транзакций. Следующий пример кода приведен в связанном блоге:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
initTransactions : должен вызываться перед началом каждой новой транзакции; генерирует исключение, если активен другой производитель с тем же транзакцией.id.
beginTransaction : Должен быть вызван, чтобы сигнализировать о начале новой транзакции. Производитель записывает локальное состояние, указывающее, что транзакция началась, но транзакция не начнется с точки зрения координатора, пока не будет отправлена первая запись.
commitTransaction : должен быть вызван для начала Координатор процесса совершения транзакции.
На стороне потребителя необходимо убедиться, что потребитель читает только подтвержденные транзакционные сообщения. Это может быть достигнуто путем установки isolation.level
в read_committed
(по умолчанию: read_uncommitted
).