Python librdkafka продюсер выступит против родного Apache Kafka Producer - PullRequest
0 голосов
/ 05 марта 2019

Я тестирую Apache Kafka Producer с нативной реализацией java против confluent-kafka Python, чтобы увидеть, какая из них имеет максимальную пропускную способность.

Я развертываю кластер Kafka с 3 брокерами Kafka и 3 экземплярами Zookeeper с использованием docker-compose,Файл составления моего докера: https://paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

Это очень простой код, в основном с параметрами по умолчанию для Python confluent-kafka и некоторыми изменениями конфигурации в Java-производителе, чтобы соответствовать конфигурации confluent-kafka.

PythonКод:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()

Реализация Java.Конфигурация такая же, как конфиг в либрдкафке.Изменены linger.ms и обратный вызов в соответствии с рекомендациями Edenhill.

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaProducerExampleAsync {

    private final static String TOPIC = "test-topic";
    private final static String BOOTSTRAP_SERVERS = "kafka-1:19092,kafka-2:29092,kafka-3:39092";

    private static Producer<String, String> createProducer() {
        int bufferMemory = 67108864;
        int batchSizeBytes = 1000000;
        String acks = "all";

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSizeBytes);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000);
        props.put(ProducerConfig.ACKS_CONFIG, acks);

        return new KafkaProducer<>(props);
    }

    static void runProducer(final int sendMessageCount) throws InterruptedException {
        final Producer<String, String> producer = createProducer();
        final String msg = "0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357";

        final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
        final long[] new_time = new long[1];

        try {
            for (long index = 0; index < sendMessageCount; index++) {
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // This if-else is to only start timing this when first message reach kafka
                        if(e != null) {
                           e.printStackTrace();
                        } else {
                            if (new_time[0] == 0) {
                                new_time[0] = System.currentTimeMillis();
                            }
                        }
                    }
                });
            }
        } finally {
            // producer.flush();
            producer.close();
            System.out.printf("Total time %d ms\n", System.currentTimeMillis() - new_time[0]);
        }
    }

    public static void main(String... args) throws Exception {
        if (args.length == 0) {
            runProducer(1000000);
        } else {
            runProducer(Integer.parseInt(args[0]));
        }
    }
}

Результаты тестов (редактируются после внесения изменений, рекомендованных Edenhill)

Acks = 0 , Сообщения: 1000000

Java: 12.066

Python: 9.608 секунд

Acks: все , Сообщения:1000000

Java: 45.763 11.917 секунд

Python: 14.3029 секунд


Реализация Java выполняется так же, как и реализация Python, даже послевнося все изменения, которые я мог придумать, и те, которые предложил Edenhill в комментарии ниже.

Существуют различные тесты производительности Kafka в Python, но я не смог найти сравнения между librdkafka или python KafkaApache Kafka.

У меня есть два вопроса:

  1. Достаточно ли этого теста, чтобы прийти к выводу, что с настройками по умолчанию и сообщением размером 1 Кб librdkafka быстрее?

  2. ДоуУ кого-нибудь есть опыт или источник (блог, документ и т. д.), сравнивающий librdkafka с confluent-kafka?

1 Ответ

0 голосов
/ 05 марта 2019

Python-клиент использует librdkakfa, который переопределяет некоторые стандартные настройки Kafka.

Paramter = Kafka default
batch.size = 16384
max.in.flight.requests.per.connection = 5 (librdkafka's default is 1000000)

message.max.bytes в librdkafka может быть эквивалентно max.request.size .

Я думаю, что нет никакого эквивалента queue.buffering.max.messages в librdKafka в API производителя Кафки.Если вы найдете что-то, поправьте меня.

Кроме того, удалите параметр buffer.memory из программы Java.

https://kafka.apache.org/documentation/#producerconfigs https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Следующим шагом является то, что для загрузки классов Java требуется некоторое время.Поэтому вам нужно увеличить количество сообщений вашего производителя-производителя.Было бы замечательно, если бы на создание всех сообщений ушло не менее 20-30 минут.Затем вы можете сравнить Java-клиент с Python-клиентом.

Мне нравится идея сравнения между Python и Java-клиентом.Продолжайте публиковать свои результаты в stackoverflow.

...