Кафка выдает многозначное сообщение - PullRequest
0 голосов
/ 21 мая 2018

Я новичок в работе с Kafka.

Могу ли я отправить сообщение с несколькими значениями?у меня есть этот пример, который создает сообщение со случайным числом, однако я хотел бы добавить временную метку для каждой создаваемой записи.Какой стандартный способ сделать это?

    while(true){
        for(int key=0; key < 10000; key++){
            Random rand = new Random();
            int  n = rand.nextInt(50) + 1;

            SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
            Date date = new Date();
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("java-topic", Integer.toString(1),Integer.toString(n));
            producer.send(producerRecord);
            Thread.sleep(10000);
        }
        //producer.close();
    }

Ответы [ 2 ]

0 голосов
/ 21 мая 2018

Спасибо за помощь !!Выходной ответ

while(true){
    for(int key=0; key < 10000; key++){
        Random rand = new Random();
        Date date = new Date();
        SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");

        int  n = rand.nextInt(50) + 1;
        String today = formatter.format(date);
        String msg = String.format("{\"numbers\": %s, \"timestamp\": \"%s\"}", n, today);

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                "java-topic", Integer.toString(1), msg);
        producer.send(producerRecord);
        Thread.sleep(1000);
    }
    //producer.close();
}
0 голосов
/ 21 мая 2018

Внутри Kafka брокер (сервер) хранит только байтовый массив.Таким образом, вам придется закодировать ваше сообщение, чтобы оно содержало все значения.
Популярный способ сделать это - использовать кодировку JSON в сообщении

int numbers[] = {1,2,3};
String msg = String.format("{\"numbers\": %s, \"timestamp\": \"%s\"}", 
                java.util.Arrays.toString(numbers), timestamp);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
       "java-topic", msg);

Примечание добавлена ​​временная меткасервером kafka при получении сообщения, которое можно увидеть в ConsumerRecord.timestamp ()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...