рассылка кафки сообщений между потребителями - PullRequest
0 голосов
/ 01 марта 2019

У меня очень простой случай использования kafka, когда я сталкиваюсь с проблемой распределения сообщений между двумя разделами.

У меня есть 2 раздела по теме, и у меня есть 2 потребителя для каждого.Я вижу, что в определенный раздел отправлено больше сообщений, и только один потребитель получает сообщения для обработки, а другой, подписанный на раздел, имеющий меньше сообщений, бездействует вечно.оба потребителя имеют одинаковый идентификатор группы.Я не могу добиться горизонтального масштабирования с этой проблемой.

Ниже приведены ключевые конфигурации, которые я добавляю.

kafka.session.timeout.ms=10000
kafka.auto.commit=false
kafka.maxpoll.interval.ms=50000
kafka.request.timeout.ms=15000
kafka.maxpoll.records=100

** PS: ** имена взяты из моего файла проп и не точно совпадают с реальными именами свойств kafka.Мне нужен большой интервал максимального опроса, чтобы обработать большой кусок за один раз.Любое предположение, что мне нужно добавить в конфигурации или изменить его?

Ответы [ 2 ]

0 голосов
/ 02 марта 2019

Как уже упоминалось в другом ответе, kafka использует хеш ключа для определения раздела.Возможно, ваш ключ распределен неравномерно. В таких случаях вы можете определить собственную стратегию выбора раздела по производителю при создании записей.Создайте пользовательский класс partitoner и реализуйте его метод разбиения следующим образом.

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;

public class CustomPartitioner implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceof String)))
            throw new InvalidRecordException("We expect all messages to have a key");
        // Your logic to decide partition based on key
        return 0;// Here return thr partition decided based on key
    }

    public void close() {
    }

    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }
}

В конфигурации производителя добавьте следующее

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());
property 
0 голосов
/ 01 марта 2019

Производитель Kafka: производители отправляют записи в раздел на основе ключа записи.Разделитель по умолчанию для Java использует хэш ключа записи для выбора раздела или использует метод циклического перебора, если запись не имеет ключа.Поэтому для большей масштабируемости всегда используйте уникальный ключ для сообщения

Производители публикуют данные по темам по своему выбору.Производитель несет ответственность за выбор записи, которую следует назначить тому или иному разделу в теме. Это может быть сделано в циклическом режиме, просто чтобы сбалансировать нагрузку, или это может быть сделано в соответствии с некоторой семантической функцией разделения (скажем, на основе некоторого ключа в записи).Подробнее об использовании разбиения за секунду!

Если записи имеют одинаковый ключ, то они заканчиваются на том же разделе

Вы также можете отправитьзапись в определенный раздел

public ProducerRecord(String topic,
          Integer partition,
          K key,
          V value)

Создает запись для отправки в указанную тему и раздел

...