Кафка медленное соединение - PullRequest
0 голосов
/ 12 октября 2018

На моем локальном ПК я запускаю сервер confluent-oss-5.0.0-2.11 kafka со свойствами сервера по умолчанию (из etc / kafka) и создал темы test1 и test2 с помощью следующей команды

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 –topic

Ниже приведены свойства среды, которые у меня есть

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(BATCH_SIZE_CONFIG, 16384);
props.put(LINGER_MS_CONFIG, 10);
props.put(BUFFER_MEMORY_CONFIG, 33554432);
props.put(METADATA_MAX_AGE_CONFIG, "10");

Вот что делает производитель

String padded = RandomStringUtils.random(2000, true, true);
for(int i=0;i<1000000;i++) {
    kafkaProducer.send("a" + i, "aa" + padded + i, "test1");
    kafkaProducer.send("a" + i, "bb" + padded + i, "test2");
}
kafkaProducer.flush();

Вот что делает потребитель

KTable<String, String> a = builder.table("test");

KTable<String, String> b = builder.table("test1");

a.join(b, new ValueJoiner<String, String, String>() {

    @Override

    public String apply(String value1, String value2) {

        return "a" + value1;

    }

}).toStream().to("finalTopic");

А ниже показано, какЯ наблюдаю за производительностью "finalTopic" населения

AtomicInteger counter = new AtomicInteger();
builder.<String, String>stream("finalTopic").peek((key, value) -> {
    if(counter.incrementAndGet()%1000 == 0) {
        logger.info("date {}, final join key {}, value size {}, joins performed {}", System.currentTimeMillis(), key, value.length(), counter.get());
    }
});   

Мне удалось обойти 55 000 сообщений в секунду , передавая вышеупомянутые сообщения в различные темы Kafka с помощью производителя.

Однакона стороне потребителя частота сообщений, заполняемых в "finalTopic", составляет около 110 сообщений в секунду .

Любой указатель приветствуется!

...