Не работает с localhost на Кафку в HDP Sandbox 2.6.5 - PullRequest
0 голосов
/ 12 сентября 2018

Я пишу клиенту Kafka как:

public class BasicProducerExample {
   public static void main(String[] args){
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       //props.put(ProducerConfig.
       props.put("batch.size","16384");// maximum size of message 

       Producer<String, String> producer = new KafkaProducer<String, String>(props);
       TestCallback callback = new TestCallback();
       Random rnd = new Random();
       for (long i = 0; i < 2 ; i++) {
           //ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", "key-" + i, "message-"+i );
           //Topci and Message
           ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", ""+i);
           producer.send(data, callback);
       }

       producer.close();
   }
   private static class TestCallback implements Callback {
       @Override
       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {
               System.out.println("Error while producing message to topic :" + recordMetadata);
               e.printStackTrace();
           } else {
               String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
               System.out.println(message);
           }
       }
   }
}

ВЫВОД: Ошибка при создании сообщения в теме: ноль org.apache.kafka.common.errors.TimeoutException: не удалось обновить метаданные после 60000 мс.

Примечание: Порт посредника: localhost: 6667 работает.

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Я использую Apache Kafka для установки Hortonworks (релиз HDP 2.X). Обнаруженное сообщение об ошибке означает, что производитель Kafka не смог отправить данные в файл журнала сегмента. С консоли командной строки это будет означать 2 вещи:

  1. Вы используете неверный порт для брокеров
  2. Ваша конфигурация слушателя в server.properties не работает

Если при записи через scala api появляется сообщение об ошибке, дополнительно проверьте соединение с кластером kafka, используя telnet <cluster-host> <broker-port>

ПРИМЕЧАНИЕ: Если вы используете scala api для создания темы, брокерам требуется некоторое время, чтобы узнать о вновь созданной теме. Таким образом, сразу после создания темы производители могут завершиться с ошибкой Failed to update metadata after 60000 ms.

Я сделал следующие проверки для решения этой проблемы:

Первое отличие после проверки через Ambari заключается в том, что брокеры Kafka прослушивают порт 6667 на HDP 2.x (apache kafka использует 9092).

listeners=PLAINTEXT://localhost:6667

Далее используйте ip вместо localhost. Я выполнил netstat -na | grep 6667

tcp        0      0 192.30.1.5:6667        0.0.0.0:*               LISTEN     
tcp        1      0 192.30.1.5:52242       192.30.1.5:6667        CLOSE_WAIT 
tcp        0      0 192.30.1.5:54454       192.30.1.5:6667        TIME_WAIT

Итак, я изменил вызов производителя для пользователя по IP, а не по localhost:

./kafka-console-producer.sh --broker-list 192.30.1.5:6667 --topic rdl_test_2

Чтобы отслеживать, записываются ли новые записи, следите за папкой /kafka-logs.

cd /kafka-logs/<topic name>/
ls -lart
-rw-r--r--.  1 kafka hadoop        0 Feb 10 07:24 00000000000000000000.log
-rw-r--r--.  1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
-rw-r--r--.  1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index

Как только производитель успешно пишет, лог-файл сегмента 00000000000000000000.log будет увеличиваться в размере.

см. Размер ниже:

-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
-rw-r--r--. 1 kafka hadoop       **45** Feb 10 09:16 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex

На этом этапе вы можете запустить consumer-console.sh:

./kafka-console-consumer.sh --bootstrap-server 192.30.1.5:6667 --topic rdl_test_2 --from-beginning
response is hello world

После этого шага, если вы хотите создавать сообщения через API Scala, измените значение listeners (с localhost на публичный IP) и перезапустите брокеров Kafka через Ambari:

listeners=PLAINTEXT://192.30.1.5:6667 

Пример производителя будет следующим:

package com.scalakafka.sample
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}

class SampleKafkaProducer {
  case class KafkaProducerConfigs(brokerList: String = "192.30.1.5:6667") {
    val properties = new Properties()
    val batchsize :java.lang.Integer = 1

    properties.put("bootstrap.servers", brokerList)
    properties.put("key.serializer", classOf[StringSerializer])
    properties.put("value.serializer", classOf[StringSerializer])
    //    properties.put("serializer.class", classOf[StringDeserializer])
        properties.put("batch.size", batchsize)
    //    properties.put("linger.ms", 1)
    //    properties.put("buffer.memory", 33554432)
  }

  val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)

  def produce(topic: String, messages: Iterable[String]): Unit = {
    messages.foreach { m =>
      println(s"Sending $topic and message is $m")
      val result = producer.send(new ProducerRecord(topic, m)).get()
      println(s"the write status is ${result}")
    }
    producer.flush()
    producer.close(10L, TimeUnit.MILLISECONDS)
  }
}

Надеюсь, это кому-нибудь поможет.

0 голосов
/ 12 сентября 2018

В вашей собственности для BOOTSTRAP_SERVERS_CONFIG попробуйте изменить номер порта на 6667.

Спасибо.

- Hiren

...