Для печати дубликатов в kafka topi c на экран в java - PullRequest
0 голосов
/ 15 апреля 2020

Я использовал следующий код для массового производства некоторых сообщений.

public class ProducerDemoWithCallback {

  public static void main(String[] args) {

      final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

    String bootstrapServers = "localhost:9092";
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


for (int i=0; i<100; i++ ) {
    // create a producer record
    ProducerRecord<String, String> record =
            new ProducerRecord<String, String>("TwitterProducer", "This is message no: " + Integer.toString(i));

    // send data - asynchronous
    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            // executes every time a record is successfully sent or an exception is thrown
            if (e == null) {
                // the record was successfully sent
                logger .info("Received new metadata. \n" +
                        "Topic:" + recordMetadata.topic() + "\n" +
                        "Partition: " + recordMetadata.partition() + "\n" +
                        "Offset: " + recordMetadata.offset() + "\n" +
                        "Timestamp: " + recordMetadata.timestamp());


            } else {
                logger .error("Error while producing", e);
            }
        }
    });
}

// flush data
producer.flush();
// flush and close producer
producer.close();
 }
 }

Я массово отправляю некоторые сообщения в kafka topi c, а затем мое приложение отправляет те же сообщения в другой топ kafka c. Эти две темы должны содержать одинаковые сообщения, но я получаю несколько повторяющихся сообщений во второй топи c.

, когда я заглядываю в кафку топи c, кажется, что некоторые сообщения появляются там два раза. например, mychannel2 имеет такие данные, как

This is message no: 2
This is message no: 3
This is message no: 4
This is message no: 5
This is message no: 6
.
.
This is message no: 18
This is message no: 19
.
.
This is message no: 18
This is message no: 19
.
.
This is message no: 99

enter image description here

, поэтому сообщения 18 и 19 et c дублируются.

Существует ли какая-либо команда kafka или общий пример кода java, который можно использовать для печати дублирующихся данных в топике kafka c. Спасибо

1 Ответ

0 голосов
/ 15 апреля 2020

Вы спрашиваете, как найти, какие записи дублируются в Кафке, и мой ответ в первую очередь направлен на предотвращение дублирования. Насколько я знаю, выявление дубликатов в Topi c может быть выполнено только с помощью специального приложения.

Что вам нужно, чтобы избежать дубликатов, так это семантика "точно один раз" для Kafka. Об этом есть множество хороших блогов в Интернете.

Основная идея заключается в том, чтобы включить продюсер безграничного влияния, установив enable.idempotence=true. Чтобы гарантировать, что только один запрос может быть отправлен брокеру за раз, вы можете установить max.in.flight.requests.per.connection=1. С acks вы можете настроить долговечность отправляемых сообщений. Для семантики "точно один раз" она должна быть установлена ​​на acks=all.

Кроме того, теперь Kafka поддерживает запись atomi c через несколько разделов через новый API транзакций. Следующий пример кода приведен в связанном блоге:

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch(ProducerFencedException e) {
  producer.close();
} catch(KafkaException e) {
  producer.abortTransaction();
}

initTransactions : должен вызываться перед началом каждой новой транзакции; генерирует исключение, если активен другой производитель с тем же транзакцией.id.

beginTransaction : Должен быть вызван, чтобы сигнализировать о начале новой транзакции. Производитель записывает локальное состояние, указывающее, что транзакция началась, но транзакция не начнется с точки зрения координатора, пока не будет отправлена ​​первая запись.

commitTransaction : должен быть вызван для начала Координатор процесса совершения транзакции.

На стороне потребителя необходимо убедиться, что потребитель читает только подтвержденные транзакционные сообщения. Это может быть достигнуто путем установки isolation.level в read_committed (по умолчанию: read_uncommitted).

...