Кафка - не все потребители получают подписанное сообщение - PullRequest
1 голос
/ 08 апреля 2019

Чтобы публиковать сообщения, используя Kafka, я использую имя класса в качестве темы:

kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));

А потребители подписываются на интересующие их классы:

    for(Object sub:_subscriptions)
        topics.add(sub.getClass().getName());
    _kafkaConsumer.subscribe(topics);

Проблема в том, что только один из потребителей получает подписанное сообщение. Насколько я понимаю, kafka назначит каждому подписчику уникальный раздел (если есть). В настоящее время у меня есть только 2 подписчика, и мой kafka server.properties указал 4 раздела. Похоже, что все потребители читают из одного раздела. Возможно, Кафка - плохой выбор для служебного автобуса из-за этого очевидного ограничения. Любая помощь будет высоко ценится!

Кафка потребительских свойств:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("enable.auto.commit", "false");
    properties.put("group.id", "TestGroup");
    properties.put("auto.offset.reset","earliest");

Свойства производителя Kafka:

    properties.put("bootstrap.servers",_settings.getEndpoint());
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Свойства сервера (единственное, что я изменил из свойств по умолчанию):

num.partitions=4

Примечание: я также пробовал пользовательские настройки как:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("auto.commit.interval.ms","1000");
    properties.put("enable.auto.commit", "true");
    properties.put("group.id", "testGroup");
    properties.put("auto.offset.reset","latest");
    properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

Ответы [ 2 ]

1 голос
/ 08 апреля 2019

Если все ваши потребители имеют одинаковую группу потребителей (свойство group.id), то только один потребитель из группы получит сообщение. Если вы хотите, чтобы все потребители получали сообщение, они должны иметь разные group.id.

Чтобы проверить, какие потребители связаны с разделами темы, вы можете использовать следующую команду

./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe
1 голос
/ 08 апреля 2019

Kafka по умолчанию использует RangeAssignor в качестве стратегии назначения разделов, которая имеет следующие характеристики:

Назначитель диапазона работает отдельно для каждой темы.Для каждой темы мы выкладываем доступные разделы в числовом порядке и потребителей в лексикографическом порядке.Затем мы делим количество разделов на общее количество потребителей, чтобы определить количество разделов, назначаемых каждому потребителю.Если он не разделен поровну, то у первых нескольких потребителей будет один дополнительный раздел.Например, предположим, что есть два потребителя C0 и C1, две темы t0 и t1, и каждая тема имеет 3 раздела, в результате чего получаются разделы t0p0, t0p1, t0p2, t1p0, t1p1 и t1p2.Назначение будет: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

Если вы хотите более равномерное распределение для небольшого числа разделов, вы можете использовать RoundRobinAssignor путем установки partition.assignment.strategy

...