Производителю Kafka не удается создать сообщение через сервер Bootstrap.как настроить брокерский хост? - PullRequest
0 голосов
/ 31 января 2019

Получение исключения тайм-аута с API Kafka Producer.

Ручной процесс создания сообщения Kafka

Мы подключаемся к серверу kafka: xxxx Там мы вводим команду для производителя

/kafka/bin/kafka-console-producer.sh --broker BrokerHostAddress:9092 --topic TestTopic
{ValidJsonData}
ниже

Сервер Kafka и BrokerHostAddress - это разные адреса.

Попытка создать сообщение по вышеуказанной теме через Java + kafka-клиентов (2.1.0), получая: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: не удалось обновить метаданные после 60000 мс.

String bootstrap_Server = "x.x.x.x:port"
private static KafkaProducer<String, String> producer = null;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producer = new KafkaProducer<String, String>(props);

kafkaInTopicName = "TestTopic";
Key = "123123";
value = "{ValidJsonData}";
producer.send(new ProducerRecord<String, String>(kafkaInTopicName, key, value)).get(); ```


1 Ответ

0 голосов
/ 31 января 2019

Из строк props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server); и String bootstrap_Server = "x.x.x.x" кажется, что ваш сервер начальной загрузки не содержит порт, поэтому он должен быть "xxxx: 9092", где "xxxx" - это IP-адрес одного из брокеров Kafka в вашемкластер.

...