Чтобы публиковать сообщения, используя Kafka, я использую имя класса в качестве темы:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
А потребители подписываются на интересующие их классы:
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
Проблема в том, что только один из потребителей получает подписанное сообщение. Насколько я понимаю, kafka назначит каждому подписчику уникальный раздел (если есть). В настоящее время у меня есть только 2 подписчика, и мой kafka server.properties указал 4 раздела. Похоже, что все потребители читают из одного раздела. Возможно, Кафка - плохой выбор для служебного автобуса из-за этого очевидного ограничения. Любая помощь будет высоко ценится!
Кафка потребительских свойств:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
Свойства производителя Kafka:
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Свойства сервера (единственное, что я изменил из свойств по умолчанию):
num.partitions=4
Примечание: я также пробовал пользовательские настройки как:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");