Как узнать, когда запись совершается в Кафке? - PullRequest
0 голосов
/ 03 мая 2018

В случае интеграционного тестирования я отправляю запись в Kafka и хотел бы знать, когда она будет обработана и зафиксирована, а затем сделаю мои утверждения (вместо использования Thread.sleep) ...

Вот моя попытка:

public void sendRecordAndWaitUntilItsNotConsumed(ProducerRecord<String, String> record)
      throws ExecutionException, InterruptedException {

    RecordMetadata recordMetadata = producer.send(record).get();
    TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(),
        recordMetadata.partition());

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {

      consumer.assign(Collections.singletonList(topicPartition));

      long recordOffset = recordMetadata.offset();
      long currentOffset = getCurrentOffset(consumer, topicPartition);

      while (currentOffset <= recordOffset) {
        currentOffset = getCurrentOffset(consumer, topicPartition);
        LOGGER.info("Waiting for message to be consumed - Current Offset = " + currentOffset
            + " - Record Offset = " + recordOffset);
      }
    }
  }

  private long getCurrentOffset(KafkaConsumer<String, String> consumer,
      TopicPartition topicPartition) {

    consumer.seekToEnd(Collections.emptyList());

    return consumer.position(topicPartition);
  }

Но это не работает. Действительно, я отключил коммит потребителя, и он не зацикливается на Waiting for message to be consumed...

1 Ответ

0 голосов
/ 07 мая 2018

Работает с использованием KafkaConsumer#committed вместо KafkaConsumer#position.

private void sendRecordAndWaitUntilItsNotConsumed(ProducerRecord<String, String> record) throws InterruptedException, ExecutionException {

        RecordMetadata recordMetadata = producer.send(record).get();

        TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(),
                recordMetadata.partition());

        consumer.assign(Collections.singletonList(topicPartition));

        long recordOffset = recordMetadata.offset();
        long currentOffset = getCurrentOffset(topicPartition);

        while (currentOffset < recordOffset) {
            currentOffset = getCurrentOffset(topicPartition);
            LOGGER.info("Waiting for message to be consumed - Current Offset = " + currentOffset
                    + " - Record Offset = " + recordOffset);
            TimeUnit.MILLISECONDS.sleep(200);
        }
    }

    private long getCurrentOffset(TopicPartition topicPartition) {
        OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
        return offsetAndMetadata != null ? offsetAndMetadata.offset() - 1 : -1;
    }
...