Я пытаюсь оптимизировать производительность Kafka в сценарии, где есть высокая задержка (> 500 мс) и периодическая потеря пакетов. Я работаю с JAVA и использую API-интерфейс kafka_2.13, версия: 2.5.0. У меня 24 узла, подключенных к одному брокеру, каждый узел пытается отправить небольшое сообщение всем остальным подписчикам. Я наблюдаю, что все узлы способны обмениваться данными, когда нет потери пакетов или задержки, но, похоже, они не могут обмениваться данными вскоре после того, как я добавлю задержку и потерю пакетов. Я проведу больше тестов в понедельник, но мне было интересно, есть ли у кого-нибудь какие-либо предложения по возможным улучшениям конфигурации.
После этого вы можете увидеть код, который я вижу, чтобы опубликовать sh и получать сообщения, а затем различные конфигурации, которые используется для потребителей и производителей.
Издатели:
boolean sendAsyncMessage (byte[] value, String topic) {
ProducerRecord<Long, byte[]> record = new ProducerRecord<> (topic, System.currentTimeMillis (), value);
long msStart = System.currentTimeMillis ();
producer.send (record, (metadata, exception) -> {
long msDelta = System.currentTimeMillis () - msStart;
logger.info ("Message with topic {} sent at {}, was ack after {}", topic, msStart, msDelta);
if (metadata == null) {
logger.info ("An exception was triggered during send:" + exception.toString ());
}
});
producer.flush ();
return true;
}
Подписчики:
while (keepGoing.get ()) {
try {
// java example do it every time!
subscribe ();
ConsumerRecords<Long, byte[]> consumerRecords = consumer.poll (Duration.ofMillis (2000));
manageMessage (consumerRecords);
//Thread processRecords = new Thread (() -> manageMessage (consumerRecords));
//processRecords.start ();
} catch (Exception e) {
logger.error ("Problem in polling: " + e.toString ());
}
}
Производитель:
properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_SERIALIZER);
properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_SERIALIZER);
properties.put (ProducerConfig.ACKS_CONFIG, reliable ? "1" : 0);
// host1:port1,host2:port2
properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// how many bytes to buffer records waiting to be sent to the server
//properties.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put (ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
properties.put (ProducerConfig.CLIENT_ID_CONFIG, clientID);
//15 MINUTES
properties.put (ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 54000000);
// MAX UNCOMPRESSED MESSAGE SIZE
// properties.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
// properties.put (ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
properties.put (ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 300);
properties.put (ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
Потребитель
properties.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaBroker.KEY_DESERIALIZER);
properties.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaBroker.VALUE_DESERIALIZER);
// host1:port1,host2:port2
properties.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
// should be the topic
properties.put (ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.put (ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");