Я создал тему "test_topic_02" и вручную записал данные в брокер, который успешно завершился.Но когда я создаю данные с помощью следующего кода, запись данных в брокер не работает.
object KafkaProducer {
private val log: slf4j.Logger = LoggerFactory.getLogger(this.getClass)
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val topic = "test_topic_02"
val brokers = "10.31.31.45:9092"
val props = new Properties()
var partition: Int = 0
//partition
val list: List[Int] = List(0, 1, 2, 3, 4)
props.put("bootstrap.servers", brokers)
props.put("client.id", "KafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//create producer
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
while (true) {
for (i <- 1 to 100) {
for (j <- list) {
partition = j
try {
val producerData = new ProducerRecord[String, String](topic, Integer.valueOf(partition), "message from simulator_" + Integer.toString(i), Integer.toString(i))
val future: Future[RecordMetadata] = producer.send(producerData)
println(producerData)
//implicit number to long
future.get(long2Long(3), TimeUnit.SECONDS)
println("Message Sent Successfully")
Thread.sleep(1000)
} catch {
case e : Exception =>
log.error("Launching Failed")
}
}
}
}
println("Stop Producing Data")
producer.close()
}
}