В случае интеграционного тестирования я отправляю запись в Kafka и хотел бы знать, когда она будет обработана и зафиксирована, а затем сделаю мои утверждения (вместо использования Thread.sleep
) ...
Вот моя попытка:
public void sendRecordAndWaitUntilItsNotConsumed(ProducerRecord<String, String> record)
throws ExecutionException, InterruptedException {
RecordMetadata recordMetadata = producer.send(record).get();
TopicPartition topicPartition = new TopicPartition(recordMetadata.topic(),
recordMetadata.partition());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
consumer.assign(Collections.singletonList(topicPartition));
long recordOffset = recordMetadata.offset();
long currentOffset = getCurrentOffset(consumer, topicPartition);
while (currentOffset <= recordOffset) {
currentOffset = getCurrentOffset(consumer, topicPartition);
LOGGER.info("Waiting for message to be consumed - Current Offset = " + currentOffset
+ " - Record Offset = " + recordOffset);
}
}
}
private long getCurrentOffset(KafkaConsumer<String, String> consumer,
TopicPartition topicPartition) {
consumer.seekToEnd(Collections.emptyList());
return consumer.position(topicPartition);
}
Но это не работает. Действительно, я отключил коммит потребителя, и он не зацикливается на Waiting for message to be consumed...