Я использую Apache Kafka для установки Hortonworks (релиз HDP 2.X). Обнаруженное сообщение об ошибке означает, что производитель Kafka не смог отправить данные в файл журнала сегмента. С консоли командной строки это будет означать 2 вещи:
- Вы используете неверный порт для брокеров
- Ваша конфигурация слушателя в server.properties не работает
Если при записи через scala api появляется сообщение об ошибке, дополнительно проверьте соединение с кластером kafka, используя telnet <cluster-host> <broker-port>
ПРИМЕЧАНИЕ: Если вы используете scala api для создания темы, брокерам требуется некоторое время, чтобы узнать о вновь созданной теме. Таким образом, сразу после создания темы производители могут завершиться с ошибкой Failed to update metadata after 60000 ms.
Я сделал следующие проверки для решения этой проблемы:
Первое отличие после проверки через Ambari заключается в том, что брокеры Kafka прослушивают порт 6667
на HDP 2.x (apache kafka использует 9092).
listeners=PLAINTEXT://localhost:6667
Далее используйте ip вместо localhost.
Я выполнил netstat -na | grep 6667
tcp 0 0 192.30.1.5:6667 0.0.0.0:* LISTEN
tcp 1 0 192.30.1.5:52242 192.30.1.5:6667 CLOSE_WAIT
tcp 0 0 192.30.1.5:54454 192.30.1.5:6667 TIME_WAIT
Итак, я изменил вызов производителя для пользователя по IP, а не по localhost:
./kafka-console-producer.sh --broker-list 192.30.1.5:6667 --topic rdl_test_2
Чтобы отслеживать, записываются ли новые записи, следите за папкой /kafka-logs
.
cd /kafka-logs/<topic name>/
ls -lart
-rw-r--r--. 1 kafka hadoop 0 Feb 10 07:24 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
Как только производитель успешно пишет, лог-файл сегмента 00000000000000000000.log
будет увеличиваться в размере.
см. Размер ниже:
-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
-rw-r--r--. 1 kafka hadoop **45** Feb 10 09:16 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
На этом этапе вы можете запустить consumer-console.sh:
./kafka-console-consumer.sh --bootstrap-server 192.30.1.5:6667 --topic rdl_test_2 --from-beginning
response is hello world
После этого шага, если вы хотите создавать сообщения через API Scala, измените значение listeners
(с localhost на публичный IP) и перезапустите брокеров Kafka через Ambari:
listeners=PLAINTEXT://192.30.1.5:6667
Пример производителя будет следующим:
package com.scalakafka.sample
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
class SampleKafkaProducer {
case class KafkaProducerConfigs(brokerList: String = "192.30.1.5:6667") {
val properties = new Properties()
val batchsize :java.lang.Integer = 1
properties.put("bootstrap.servers", brokerList)
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
// properties.put("serializer.class", classOf[StringDeserializer])
properties.put("batch.size", batchsize)
// properties.put("linger.ms", 1)
// properties.put("buffer.memory", 33554432)
}
val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)
def produce(topic: String, messages: Iterable[String]): Unit = {
messages.foreach { m =>
println(s"Sending $topic and message is $m")
val result = producer.send(new ProducerRecord(topic, m)).get()
println(s"the write status is ${result}")
}
producer.flush()
producer.close(10L, TimeUnit.MILLISECONDS)
}
}
Надеюсь, это кому-нибудь поможет.