Мы создаем PO C для чтения CD с базой данных C и pu sh его во внешние системы.
- каждый исходный CD с таблицей C отправляется в соответствующие разделы в Формат Avro (с реестром схемы Kafka и сервером Kafka)
- Мы пишем код java для использования сообщений в схеме avro, десериализации его с помощью AvroSerde и присоединения к ним, а затем отправки в разные темы, чтобы он могут использоваться внешними системами.
У нас есть ограничение, которое заключается в том, что мы не можем создавать сообщения в темы исходной таблицы для отправки / получения нового содержимого / изменений. Таким образом, единственный способ написать код соединения - это читать сообщения с начала каждый раз из каждого источника topi c при запуске приложения. (Пока мы не убедимся, что код работает и снова можем начать получать данные в реальном времени)
В объекте KafkaConsumer у нас есть возможность использовать метод seekToBeginning для принудительного чтения с начала в коде jave, что работает. Однако нет возможности, когда мы пытаемся передать topi c с помощью объекта KStream и принудительно прочитать его с самого начала. Какие здесь есть альтернативы?
Мы попытались сбросить смещение с помощью kafka-consumer-groups reset-topi c с параметром --to-early, но это устанавливает смещение только до ближайшего. Когда мы пытаемся сбросить смещение вручную с помощью «0» с параметром --to-offset, мы получаем предупреждение ниже, но не устанавливаем значение «0». Насколько я понимаю, установка значения 0 должна читать сообщения с самого начала. исправьте меня, если я ошибаюсь.
"WARN Новое смещение (0) ниже, чем самое раннее смещение для topi c partition"
Пример кода ниже
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
StreamsBuilder builder = new StreamsBuilder();
//nothing returned here, when some offset has already been set
KStream myStream = builder.stream("my-topic-in-avro-schema",ConsumedWith(myKeySerde,myValueSerde));
KafkaStreams streams = new KafkaStreams(builder.build(),properties);
streams.start();