KafkaConsumer не может опрашивать сообщения, но kafka-console-consumer.sh может работать, почему? - PullRequest
0 голосов
/ 30 мая 2019

Мой сервер устанавливает Кафку, использует docker: wurstmeister / kafka.

Когда я использую kafka-клиентов 2.2.0 для создания и потребления сообщения kafka, мой производитель работает хорошо, но потребитель не может получить никакого сообщения.

но пока я использую shell, эта команда не может получить сообщение:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning

и эта команда может получить все сообщения:

/opt/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 10.2.0.242:9092 --topic mytesttopic --from-beginning --partition 0
String  kafkaServer = "10.2.0.242:9092";
String defaultTopic = "mytesttopic";

// create topic
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
try (AdminClient client = AdminClient.create(props)) {
    CreateTopicsResult ret = client.createTopics(Arrays.asList(new NewTopic(defaultTopic, 1, (short) 1)));
    ret.all().get();
}

// send message
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> msg = new ProducerRecord<>(defaultTopic, "a1", "test1");
Future ret = producer.send(msg);
System.out.println("send ok: " + ret.get());

// recieve message
props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", "testaaa4aaa");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
TopicPartition partition1 = new TopicPartition(defaultTopic, 0);
consumer.assign(Arrays.asList(partition1));
//        consumer.subscribe(Arrays.asList(defaultTopic));
Duration duration = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(duration);
// here always get 0.
    System.out.println(records.count());
}

Я попытался установить TopicPartition в своем коде, но все еще не могу получить сообщение, кто-нибудь может мне помочь?

1 Ответ

0 голосов
/ 04 июня 2019

Как видно из сообщения консоли, координатор вашей группы недоступен.Вы недавно обновляли / меняли свой образ Docker?Дело в том, что если вы не задаете group.id или если вы вручную указываете раздел для потребителя консоли, вы не используете управление группами Kafka (и, следовательно, не существует координатора).Так что имеет смысл, что это решает проблему.Что произойдет, если вы используете другой идентификатор группы?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...