KafkaConsumerAPI - подключается от затмения к брокеру, работающему в vm, через vagrant - PullRequest
0 голосов
/ 04 апреля 2019

Что такое advertised.listeners .Как это поможет подключиться к брокеру Kafka, работающему на виртуальной машине из Eclipse?

Я написал код KafkaConsumer и протестировал его со службой, работающей локально.Я пытаюсь использовать тот же код для подключения к Kafka Broker, работающему в ВМ, но записи не используются.Пожалуйста, объясните необходимую конфигурацию для достижения результата.

package kafkaExample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer {
  public static void main(String[] args) {

    final List<PartitionInfo> partitionInfos;


    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
    /*    props.put("zookeeper.connect", "10.30.3.2:2181,10.30.3.3:2181");*/
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cgrp1");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-tutorial");


    final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    kafkaConsumer.subscribe(Arrays.asList(args[1]),
        new ConsumerRebalanceListener() {
          @Override
          public void onPartitionsRevoked(Collection<TopicPartition> arg0) {

        }

          @Override
          public void onPartitionsAssigned(Collection<TopicPartition> tps) {

        }
        });

    System.out.println("list of topics " + kafkaConsumer.listTopics());

    ConsumerRecords<String, String> records = null;
    while (true) {
      records = kafkaConsumer.poll(Duration.ofMillis(100));
      if (records == null || records.count() == 0) {
        System.out.println("There is no message. try again.");
        continue;
      }

      final Iterator<ConsumerRecord<String, String>> iter = records.iterator();
      while (iter.hasNext()) {
        final ConsumerRecord<String, String> cr = iter.next();
        System.out.println("Consume msg: " + cr.value());
      }
      kafkaConsumer.commitSync();
      kafkaConsumer.close();
      break;
    }
  }
}

1 Ответ

0 голосов
/ 07 апреля 2019

Рекламируемые слушатели скажут вашим клиентам подключиться к адресу, отличному от основного адреса самого сервера.

Например, у вашей виртуальной машины есть как минимум три интерфейса: localhost, внутренняя подсеть виртуальной машины для связи между несколькими виртуальными машинами, а затем, возможно, еще одна, чтобы внешний хост мог подключиться к этой виртуальной машине.

Третий не может быть там, и если нет, то вам нужно настроить переадресацию портов с хоста на внутренний интерфейс виртуальной машины. В этом случае порт будет перенаправлен с вашего хоста 9092 (может быть любым числом) на сервер 9092 Kafka, тогда вы установите для объявленных слушателей значение PLAINTEXT://127.0.0.1:9092 (или номер, выбранный с хоста) и сделаете приложение на хосте затем подключается к 127.0.0.1:9092, а не к виртуальному адресу

Если бы у вас был кластер виртуальных машин на одном хосте, вам пришлось бы использовать разные порты при пересылке, как 127.0.0.1:9092,127.0.0.1:9093

...