Учитывая название темы, номер раздела и смещение, как я могу прочитать только одну запись из темы?
В моем приложении на основе Sprng Boot я использую Kafka для импорта бизнес-данных.Записи импорта отправляются на import_queue и используются одним или несколькими бизнес-модулями.Записи всегда подтверждаются, даже если потребитель не может импортировать данные из записи, чтобы продолжить импорт данных из следующих записей.
Позже пользователь (после того, как он исправил некоторые зависимые бизнес-данные) может решить повторно-Отправить одну или несколько неудачных (но подтвержденных) записей импорта.
Смещение, номер раздела и имя темы каждой записи хранятся в моем приложении внутри базы данных SQL.
Из ссылкиДокументация и некоторые вопросы StackOverflow Я узнал, что мне нужно:
- настроить контейнер (потребитель / слушатель)
- перемотать (искать) до нужного смещения
- прочитать одну запись
- пропустить чтение оставшихся записей
Это единственный способ прочитать только одну старую запись из темы Кафки?Или есть более простое решение?
Решение
По предложению @Gary:
ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
Map<String, Object> configs = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "incubator_retry",
"max.poll.records", 1);
DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
if (consumerRecords.isEmpty()) {
throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
topicPartition.topic(), topicPartition.partition(), offset));
}
return consumerRecords.iterator().next();
}
}