Как отправить Map в тему Kafka, чтобы ключ ProducerRecord совпадал с соответствующим ключом Map - PullRequest
0 голосов
/ 02 октября 2018

Я использую потоковую передачу Spark, и данные отправляются в Kafka.Я отправляю карту Кафке.Предполагая, что у меня есть элементы Map из 20 (которые могут увеличиться до 1000 за период потоковой передачи), как показано ниже:

HashMap<Integer,String> input = new HashMap<Integer,String>();
        input.put(11,"One");
        input.put(312,"two");
        input.put(33,"One");
        input.put(24,"One");
        input.put(35,"One");
        input.put(612,"One");
        input.put(7,"One");
        input.put(128,"One");
        input.put(9,"One");
        input.put(10,"One");
        input.put(11,"One1");
        input.put(12,"two1");
        input.put(13,"One1");
        input.put(14,"One1");
        input.put(15,"One1");
        input.put(136,"One1");
        input.put(137,"One1");
        input.put(158,"One1");
        input.put(159,"One1");
        input.put(120,"One1");



        Set<Integer> inputKeys = input.keySet();
        Iterator<Integer> inputKeysIterator = inputKeys.iterator();
        while (inputKeysIterator.hasNext()) {
            Integer key = inputKeysIterator.next();
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic,
                    key%10, input.get(key));
            KafkaProducer.send(record);
        }

В моей теме Kafka 10 разделов.Здесь я звоню kafkaProducer.send () 20 раз и, следовательно, 20 звонков Кафки.Как я могу отправить целые данные в пакете, то есть в одном вызове Kafka, но опять же я хочу убедиться, что каждая запись идет в определенный раздел, управляемый по формуле ключ% 10 , как в

Запись ProducerRecord = новая запись ProducerRecord (тема, ключ% 10 , input.get (ключ));

Опции, которые я вижу: linger.ms = 1 может обеспечить это, но с задержкой 1 мс.Как избежать этой задержки и избежать 20 сетевых (Kafka) вызовов или минимизировать вызовы Kafka?

1 Ответ

0 голосов
/ 03 октября 2018

API Kafka Producer уже отправляет сообщения в пакетном режиме, даже если вы по отдельности вызываете

См. batch.size в документации, это байты, а не сообщения, но вы можете вызвать фактическое сетевое событиепозвонив flush на Producer

Что касается разделов, вам нужно будет создать свой Partitioner кода.Простая передача значения мода в качестве ключа не гарантирует, что у вас не будет коллизии хэшей в секционере по умолчанию

...