Мой код Kafka Producer просто работает без каких-либо исключений, но данные не передаются брокерам - PullRequest
0 голосов
/ 19 сентября 2018

Я создал тему "test_topic_02" и вручную записал данные в брокер, который успешно завершился.Но когда я создаю данные с помощью следующего кода, запись данных в брокер не работает.

object KafkaProducer {

    private val log: slf4j.Logger = LoggerFactory.getLogger(this.getClass)
    Logger.getLogger("org").setLevel(Level.WARN)

    def main(args: Array[String]): Unit = {

        val topic = "test_topic_02"
        val brokers = "10.31.31.45:9092"
        val props = new Properties()
        var partition: Int = 0
        //partition
        val list: List[Int] = List(0, 1, 2, 3, 4)
        props.put("bootstrap.servers", brokers)
        props.put("client.id", "KafkaProducer")
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        //create producer
        val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
        while (true) {
            for (i <- 1 to 100) {
                for (j <- list) {
                    partition = j
                    try {
                        val producerData = new ProducerRecord[String, String](topic, Integer.valueOf(partition), "message from simulator_" + Integer.toString(i), Integer.toString(i))
                        val future: Future[RecordMetadata] = producer.send(producerData)
                        println(producerData)
                        //implicit number to long 
                        future.get(long2Long(3), TimeUnit.SECONDS)
                        println("Message Sent Successfully")
                        Thread.sleep(1000)
                    } catch {
                        case e : Exception =>
                        log.error("Launching Failed")
                    }
                }
            }
        }
        println("Stop Producing Data")
        producer.close()
    }
}

1 Ответ

0 голосов
/ 19 сентября 2018

Вы должны использовать другую функцию send , которая принимает обратный вызов для записи в Kafka.В обратном вызове вы можете искать исключения или ошибки.

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...