Может ли потребитель читать записи из раздела, в котором хранятся данные с определенным значением ключа? - PullRequest
0 голосов
/ 26 февраля 2019

Вместо создания множества тем я создаю раздел для каждого потребителя и сохраняю данные с помощью ключа.Итак, есть ли способ заставить потребителя в группе потребителей читать из раздела, в котором хранятся данные определенного ключа.Если да, то можете ли вы подсказать, как это можно сделать с помощью kafka-python (или любой другой библиотеки).

Ответы [ 2 ]

0 голосов
/ 27 февраля 2019

Я считаю, что то, что вы пытаетесь достичь, не является лучшей практикой в ​​долгосрочной перспективе.

Если я понял, вам нужно определить раздел, к которому подключится потребитель, на основе ключа ключа.message.

Я полагаю, что издатели используют "разделитель по умолчанию".

Технически вы можете определить раздел темы, повторно используя в потребителе тот же алгоритм, что и в получателе.Здесь Java-код DefaultPartitioner.Вы можете адаптировать его в Python.

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

Важная часть в вашем сценарии использования, когда ключ установлен:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

И метод Utils.murmur2:

public static int murmur2(final byte[] data) {
    int length = data.length;
    int seed = 0x9747b28c;
    // 'm' and 'r' are mixing constants generated offline.
    // They're not really 'magic', they just happen to work well.
    final int m = 0x5bd1e995;
    final int r = 24;

    // Initialize the hash to a random value
    int h = seed ^ length;
    int length4 = length / 4;

    for (int i = 0; i < length4; i++) {
        final int i4 = i * 4;
        int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
        k *= m;
        k ^= k >>> r;
        k *= m;
        h *= m;
        h ^= k;
    }

    // Handle the last few bytes of the input array
    switch (length % 4) {
        case 3:
            h ^= (data[(length & ~3) + 2] & 0xff) << 16;
        case 2:
            h ^= (data[(length & ~3) + 1] & 0xff) << 8;
        case 1:
            h ^= data[length & ~3] & 0xff;
            h *= m;
    }

    h ^= h >>> 13;
    h *= m;
    h ^= h >>> 15;

    return h;
}

Почему я считаю, что это не лучшее решение?

Если вы добавите новый раздел в свою тему, DefaultPartitioner предоставит вам partition id, которыйможет отличаться от partition id, возвращенного до добавления нового раздела.И по умолчанию существующие сообщения не перераспределяются, что означает, что у вас будут сообщения с одинаковым ключом в разных разделах.

И такое же поведение происходит на стороне потребителя.После обновления количества разделов потребитель будет пытаться использовать сообщения из другого раздела.Вы пропустите сообщения из предыдущего раздела, используемого для этого ключа.

0 голосов
/ 26 февраля 2019

Вместо использования подписки и связанной логики групп потребителей вы можете использовать логику «назначить» (например, она предоставляется клиентским Java-клиентом Kafka).Хотя с подпиской на тему и являясь частью группы потребителей, разделы автоматически присваиваются потребителям и перебалансируются, когда новый потребитель присоединяется или уходит, это отличается от использования assign.С назначением, потребитель просит быть назначенным определенному разделу.Это не часть какой-либо группы потребителей.Это также означает, что вы отвечаете за перебалансировку, если потребитель умирает: например, если потребитель 1 получает назначенный раздел 1, но в какой-то момент происходит сбой, раздел 1 не будет автоматически переназначаться другому потребителю.Вы должны написать и обработать логику перезапуска потребителя (или другого) для получения сообщений из раздела 1.

...