Моя версия Kafka Cluster - 0.11.0.3
Вот мой код scala, который использовался для создания данных для кластера Kafka:
val props = new Properties()
props.put("bootstrap.servers", "192.168.0.240:9092,192.168.0.213:9092,192.168.0.235:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
var counter = 0
var keyFlag = 0
while(true){
counter +=1
keyFlag +=1
val content: String = userlogs()
producer.send(new ProducerRecord[String, String]("test2", s"key-$keyFlag", content))
if(0 == counter%200){
counter = 0
Thread.sleep(2000)
}
}
producer.close()
}
def userlogs()={
// produce data...
}
, но когда я запускаю код, потребительне потребляет данные.
Кафка-клиент также имеет версию maven 0.11.0.3. но путь ниже полезен:
./kafka-console-producer.sh --broker-list 192.168.0.240:9092,192.168.0.213:9092,192.168.0.235:9092 --topic test2