Как повторно отправить (прочитать) старое сообщение kafka из заданной темы и раздела с определенным смещением, используя spring-kafka? - PullRequest
0 голосов
/ 14 февраля 2019

Учитывая название темы, номер раздела и смещение, как я могу прочитать только одну запись из темы?

В моем приложении на основе Sprng Boot я использую Kafka для импорта бизнес-данных.Записи импорта отправляются на import_queue и используются одним или несколькими бизнес-модулями.Записи всегда подтверждаются, даже если потребитель не может импортировать данные из записи, чтобы продолжить импорт данных из следующих записей.

Позже пользователь (после того, как он исправил некоторые зависимые бизнес-данные) может решить повторно-Отправить одну или несколько неудачных (но подтвержденных) записей импорта.

Смещение, номер раздела и имя темы каждой записи хранятся в моем приложении внутри базы данных SQL.

Из ссылкиДокументация и некоторые вопросы StackOverflow Я узнал, что мне нужно:

  1. настроить контейнер (потребитель / слушатель)
  2. перемотать (искать) до нужного смещения
  3. прочитать одну запись
  4. пропустить чтение оставшихся записей

Это единственный способ прочитать только одну старую запись из темы Кафки?Или есть более простое решение?

Решение

По предложению @Gary:

ConsumerRecord<byte[], byte[]> read(String topic, int partition, long offset) {
    Map<String, Object> configs = Map.of(
            "bootstrap.servers", "localhost:9092",
            "group.id", "incubator_retry",
            "max.poll.records", 1);
    DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(
            configs, new ByteArrayDeserializer(), new ByteArrayDeserializer());

    try (Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.assign(List.of(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(5000));
        if (consumerRecords.isEmpty()) {
            throw new RuntimeException(String.format("Timeout polling from topic %s partition %d at offset %d",
                    topicPartition.topic(), topicPartition.partition(), offset));
        }
        return consumerRecords.iterator().next();
    }
}

1 Ответ

0 голосов
/ 14 февраля 2019

Существует более простое решение.

  • Используйте DefaultConsumerFactory для создания KafkaConsumer (или просто создайте его)
  • Используйте другое group.id
  • Установите для свойства max.poll.records значение 1
  • consumer.assign(...) нужную тему / раздел
  • seek(...) до требуемого смещения
  • poll(...), пока не получитезапись
  • close() потребитель

Если вы используете какое-либо преобразование сообщений (кроме десериализаторов Kafka), вам придется вызывать конвертер вручную.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...