Есть ли способ читать сообщения с использованием потока Kafka (не через KafkaConsumer) с начала каждый раз в java? - PullRequest
0 голосов
/ 04 августа 2020

Мы создаем PO C для чтения CD с базой данных C и pu sh его во внешние системы.

  1. каждый исходный CD с таблицей C отправляется в соответствующие разделы в Формат Avro (с реестром схемы Kafka и сервером Kafka)
  2. Мы пишем код java для использования сообщений в схеме avro, десериализации его с помощью AvroSerde и присоединения к ним, а затем отправки в разные темы, чтобы он могут использоваться внешними системами.

У нас есть ограничение, которое заключается в том, что мы не можем создавать сообщения в темы исходной таблицы для отправки / получения нового содержимого / изменений. Таким образом, единственный способ написать код соединения - это читать сообщения с начала каждый раз из каждого источника topi c при запуске приложения. (Пока мы не убедимся, что код работает и снова можем начать получать данные в реальном времени)

В объекте KafkaConsumer у нас есть возможность использовать метод seekToBeginning для принудительного чтения с начала в коде jave, что работает. Однако нет возможности, когда мы пытаемся передать topi c с помощью объекта KStream и принудительно прочитать его с самого начала. Какие здесь есть альтернативы?

Мы попытались сбросить смещение с помощью kafka-consumer-groups reset-topi c с параметром --to-early, но это устанавливает смещение только до ближайшего. Когда мы пытаемся сбросить смещение вручную с помощью «0» с параметром --to-offset, мы получаем предупреждение ниже, но не устанавливаем значение «0». Насколько я понимаю, установка значения 0 должна читать сообщения с самого начала. исправьте меня, если я ошибаюсь.

"WARN Новое смещение (0) ниже, чем самое раннее смещение для topi c partition"

Пример кода ниже

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);

StreamsBuilder builder = new StreamsBuilder();
//nothing returned here, when some offset has already been set
KStream myStream = builder.stream("my-topic-in-avro-schema",ConsumedWith(myKeySerde,myValueSerde)); 

KafkaStreams streams = new KafkaStreams(builder.build(),properties);
streams.start();

Ответы [ 2 ]

0 голосов
/ 07 августа 2020

Это поможет тому, кто сталкивается с той же проблемой. Замените Application Id и Group Id некоторым уникальным идентификатором, используя UUID.randomId.toString () в свойстве конфигурации. Он должен получать сообщения с начала

0 голосов
/ 04 августа 2020

Один из способов сделать это - создавать случайную группу ConsumerGroup при каждом запуске потокового приложения. Что-то вроде:

properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + currentTimestamp);

Таким образом, поток начнет чтение с "самого раннего", поскольку вы уже установили его в auto.offset.reset.

Кстати, вы устанавливаете свойства для group.id дважды в вашем коде ...

...