Несколько тем и их приоритет - PullRequest
0 голосов
/ 22 октября 2018

Я использую pykafka для потребления сообщения, и теперь я используюалансированный_консумер для получения сообщения из одной темы.Теперь я должен потреблять сообщения из другой темы, и если это возможно, приоритетное потребление сообщений из разных тем.Как я могу справиться с этой проблемой?Может быть другая библиотека для python?

1 Ответ

0 голосов
/ 22 октября 2018

Я только что опубликовал пост об этой проблеме.

Даже несмотря на то, что я использую Java, вы можете найти описанную там концепцию полезной для вашего случая.

Что мы сделали для решения проблемырасстановка приоритетов тем Кафки -

Мы разработали механизм для расстановки приоритетов потребления тем Кафки.Такой механизм проверит, хотим ли мы обработать сообщение, которое было получено из Kafka, или задержать обработку на потом.

Мы отобразили между разделами и логическими значениями, что при необходимости блокирует использование каждого раздела, topicPartitionLocks,Блокировка предварительных, продолжая поглощать запоздалые, создает приоритетность тем.TimerTask обновляет эту карту, и наши потребители проверяют, «разрешено» ли им потреблять или ждать - как вы можете видеть в методе waitForLatePartitionIfNeeded.

public class Prioritizer extends TimerTask {
    private Map<String, Boolean> topicPartitionLocks = new ConcurrentHashMap<>();
    private Map<String, Long> topicPartitionLatestTimestamps = new ConcurrentHashMap<>();

    @Override
    public void run(){
        updateTopicPartitionLocks();
    }

    private void updateTopicPartitionLocks() {
        Optional<Long> minValue = topicPartitionLatestTimestamps.values().stream().min((o1, o2) -> (int) (o1 - o2));
        if(! minValue.isPresent()) {
            return;
        }

        Iterator it = topicPartitionLatestTimestamps.entrySet().iterator();
        while (it.hasNext()) {
            Boolean shouldLock = false;

            Map.Entry<String, Long> pair = (Map.Entry)it.next();
            String topicPartition = pair.getKey();
            if(pair.getValue() > (minValue.get() + maxGap)) {
                shouldLock = true;
                if(isSameTopicAsMinPartition(minValue.get(), topicPartition)) {
                    shouldLock = false;
                }

            }

            topicPartitionLocks.put(topicPartition, shouldLock);
        }
    }

    public boolean isLocked(String topicPartition) {
        return topicPartitionLocks.get(topicPartition).booleanValue();
    }
}

waitForLatePartitionIfNeeded метод

private void waitForLatePartitionIfNeeded(final String topic, int partition) {
    String topicPartition = topic + partition;

    prioritizer.getTopicPartitionLocks.putIfAbsent(topicPartition);

    while(prioritizer.isLocked(topicPartition)) {
        monitorWaitForLatePartitionTimes(topicPartition, startTime);
        Misc.sleep(timeToWaitBetweenGapToTardyPartitionChecks.get());
    }
}

Используя это, мы увеличили баланс, поэтому мы решили это с помощью следующих определений:

Мы изменили следующую конфигурацию в Kafka

request.timeout.ms: 7300000 (~2hrs)
max.poll.interval.ms: 7200000 (2hrs)

Графики и общие описания по проблеме вы можете проверить в моемpost:

Как я решил задержки в сообщениях Kafka, расставив приоритеты в темах Kafka

Удачи!

...