Невозможно подключиться с Java к Kafka, работающему в Docker - PullRequest
0 голосов
/ 26 мая 2020

Попытка передать MySql базу данных в Kafka с помощью Debezium.

Итак, в Docker контейнере я запустил Zookeeper, Kafka, MySQL базу данных, MySQL командную строку и Kafka Connect.

Когда я запускаю любые команды DML в командной строке MySQL, я могу видеть события изменения в окне наблюдателя, которое я запустил в docker. Так что на данный момент все выглядит нормально. пожалуйста, найдите то же самое ниже.

Docker Setup

Теперь я пытаюсь использовать события изменения из кода Java, который я могу видеть в окне наблюдателя всякий раз, когда я выполняю любой Команды DML в командной строке MySQL. Пожалуйста, найдите ниже для Потребителя.

            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
            ArrayList<String> topics = new ArrayList<>();
            topics.add("dbserver1.inventory.customers");
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1L);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Message received: " + record.value());
                }
                consumer.commitAsync();
            }

Невозможно использовать события изменения данных от указанного выше потребителя. Пожалуйста, дайте мне знать, если что-то нужно сделать.

Ответы [ 2 ]

0 голосов
/ 26 мая 2020

Разница между тем, что работало, и тем, что не работало, - это порт слушателя, к которому вы обращались.

Таким образом, либо:

  • 29092 было просто неправильным, и 9092 правильными
  • 29092 было привязано к объявленному слушателю внутри сети Docker, а 9092 - к localhost ( подробнее )
0 голосов
/ 26 мая 2020

Я решил это, изменив свойство BOOTSTRAP_SERVERS_CONFIG, как показано ниже, и теперь работает нормально. Возможность потреблять изменения данных.

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

и потреблять изменения данных

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(1000L);
  for (ConsumerRecord<String, String> record : records) {
    String payload = record.value();
    if(payload.contains("payload")) {
    JsonObject root = (JsonObject) new JsonParser().parse(payload);
    root = root.getAsJsonObject("payload").getAsJsonObject("after");
   }
  }
  consumer.commitAsync();
}

Это сработало для меня. :)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...