Я пытаюсь использовать Apache Kafka в сценарии с 24 узлами, где соединение между узлами характеризуется потерей пакетов и длительным временем приема-передачи (~ 1–4 секунды).
Я использую KAFKA в качестве паб / суб-брокера, поэтому все потребители и производители получают информацию от централизованного брокера.
Я удостоверился, что назначил уникальный идентификатор группы каждому потребителю / издателю.
Если я запускаю свой код в сети Enterprise (очень маленький RTT, без потери пакетов), все работает хорошо, но если я пытаюсь запустить свой код в ограниченной сети, я могу наблюдать, как сообщения доставляются центральному посреднику, но тогда не отскочил к другим потребителям. Фактически каждый узел получает большинство первых двух сообщений, но затем он прекращает получать новые.
Пока что я не обнаружил никаких ошибок или исключений.
Я также развернул приемника на машине-посреднике, и он, похоже, получает сообщения нормально.
Я также пытался использовать консольную утилиту потребителя с одного из узлов, и она тоже не могла получать сообщения, поэтому я начал задаваться вопросом, может ли проблема быть связана с конфигурацией брокера.
У меня есть другие протоколы на основе TCP (NATS, MQTT, REDIS), и они прекрасно работают.
Интересно, есть ли какая-то конкретная конфигурация, которую я должен добавить, чтобы сделать KAFKA более отказоустойчивым? Это мои конфигурации Consumer и Producer:
Потребитель:
properties.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _server);
properties.put (ConsumerConfig.GROUP_ID_CONFIG, _groupID);
properties.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KafkaBroker.KEY_DESERIALIZER);
properties.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaBroker.VALUE_DESERIALIZER);
properties.put ("enable.auto.commit", "true");
properties.put ("auto.commit.interval.ms", "1000");
properties.put ("session.timeout.ms", "30000");
Производитель:
properties.put ("acks", "all");
properties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, _server);
properties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
KafkaBroker.KEY_SERIALIZER);
properties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaBroker.VALUE_SERIALIZER);
//If the request fails, the producer can automatically retry,
properties.put (ProducerConfig.RETRIES_CONFIG, 0);
properties.put (ProducerConfig.CLIENT_ID_CONFIG, _clientID);