Попытка передать MySql базу данных в Kafka с помощью Debezium.
Итак, в Docker контейнере я запустил Zookeeper, Kafka, MySQL базу данных, MySQL командную строку и Kafka Connect.
Когда я запускаю любые команды DML в командной строке MySQL, я могу видеть события изменения в окне наблюдателя, которое я запустил в docker. Так что на данный момент все выглядит нормально. пожалуйста, найдите то же самое ниже.
Теперь я пытаюсь использовать события изменения из кода Java, который я могу видеть в окне наблюдателя всякий раз, когда я выполняю любой Команды DML в командной строке MySQL. Пожалуйста, найдите ниже для Потребителя.
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("dbserver1.inventory.customers");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
}
consumer.commitAsync();
}
Невозможно использовать события изменения данных от указанного выше потребителя. Пожалуйста, дайте мне знать, если что-то нужно сделать.