Почему я испытываю серьезное снижение производительности при использовании kafka в ограниченной сети? - PullRequest
0 голосов
/ 17 января 2019

Я пытаюсь использовать Apache Kafka в сценарии с 24 узлами, где соединение между узлами характеризуется потерей пакетов и длительным временем приема-передачи (~ 1–4 секунды).

Я использую KAFKA в качестве паб / суб-брокера, поэтому все потребители и производители получают информацию от централизованного брокера.

Я удостоверился, что назначил уникальный идентификатор группы каждому потребителю / издателю.

Если я запускаю свой код в сети Enterprise (очень маленький RTT, без потери пакетов), все работает хорошо, но если я пытаюсь запустить свой код в ограниченной сети, я могу наблюдать, как сообщения доставляются центральному посреднику, но тогда не отскочил к другим потребителям. Фактически каждый узел получает большинство первых двух сообщений, но затем он прекращает получать новые.

Пока что я не обнаружил никаких ошибок или исключений.

Я также развернул приемника на машине-посреднике, и он, похоже, получает сообщения нормально.

Я также пытался использовать консольную утилиту потребителя с одного из узлов, и она тоже не могла получать сообщения, поэтому я начал задаваться вопросом, может ли проблема быть связана с конфигурацией брокера.

У меня есть другие протоколы на основе TCP (NATS, MQTT, REDIS), и они прекрасно работают.

Интересно, есть ли какая-то конкретная конфигурация, которую я должен добавить, чтобы сделать KAFKA более отказоустойчивым? Это мои конфигурации Consumer и Producer:

Потребитель:

properties.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _server);
properties.put (ConsumerConfig.GROUP_ID_CONFIG, _groupID);
properties.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
KafkaBroker.KEY_DESERIALIZER);
properties.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaBroker.VALUE_DESERIALIZER);
properties.put ("enable.auto.commit", "true");
properties.put ("auto.commit.interval.ms", "1000");
properties.put ("session.timeout.ms", "30000");

Производитель:

properties.put ("acks", "all");
properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _server);
properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
KafkaBroker.KEY_SERIALIZER);  
properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaBroker.VALUE_SERIALIZER);
//If the request fails, the producer can automatically retry,
properties.put (ProducerConfig.RETRIES_CONFIG, 0);
properties.put (ProducerConfig.CLIENT_ID_CONFIG, _clientID);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...