Есть ли способ расставить приоритеты в сообщениях в Apache Kafka 2.0? - PullRequest
1 голос
/ 27 марта 2019

РЕДАКТИРОВАТЬ

В случае, если кто-то еще находится в этой конкретной ситуации, я получил что-то похожее на то, что я искал после настройки пользовательских конфигураций.Я создал производителя, который отправлял сообщения о приоритетах в три отдельные темы (для высоких / средних / низких приоритетов), а затем я создал 3 отдельных потребителя для потребления из каждой.Затем я часто опрашивал темы с более высоким приоритетом и не опрашивал более низкие приоритеты, если только высокий не был пустым:

    while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records

Время ожидания опроса (аргумент метода .poll())определяет, как долго ждать, если нет записей для опроса.Я установил это на очень короткое время для каждой темы, но вы можете установить его ниже для более низких приоритетов, чтобы убедиться, что он не потребляет ценные циклы ожидания, когда имеются сообщения с высоким pri

max.poll.records config, очевидно, определяет максимальное количество записей, которые можно получить в одном опросе.Это может быть установлено выше и для более высоких приоритетов.

Конфигурация max.poll.interval.ms определяет время между опросами - сколько времени должно занимать обработка сообщений max.poll.records.Пояснение здесь .

Кроме того, я считаю, что приостановка / возобновление работы целого потребителя / темы может быть реализована следующим образом:

    kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

Я не уверен, что это лучший способ, но я не смог найти хороший пример в другом месте

Я согласен с Сэнсэйу ниже, что это не такдействительно правильное использование для Кафки.Это однопотоковая обработка, каждая тема имеет отдельного потребителя, но я буду работать над улучшением этого процесса.


Фон

Мы пытаемся улучшить наше приложение и надеемся использовать Apache Kafka для обмена сообщениями между отделенными компонентами.Наша система часто имеет низкую пропускную способность (хотя бывают случаи, когда пропускная способность может быть высокой в ​​течение некоторого времени) и имеет небольшие высокоприоритетные сообщения, которые должны обрабатываться, пока большие файлы ждут, или обрабатываются медленно, чтобы использовать меньшую пропускную способность.Мы хотели бы иметь темы с разными приоритетами.

Я новичок в Kafka, но безуспешно пытался изучить как Processor API, так и Kafka Streams, хотя некоторые сообщения на форумах, кажется, говорят, что это выполнимо.

API процессора

Когда я попробовал Processor API, я попытался определить, обрабатывает ли высокий приоритет KafkaConsumer что-либо в данный момент, проверив, если poll()был пуст, а затем надеялся на poll() с Med Priority Consumer, но второй опрос темы вернулся пустым.Также не было простого способа получить все TopicPartition по теме для вызова kafkaConsumer.pause(partitions).

Kafka Streams

Когда я попытался KafkaStreams, я настроил поток для потребления из каждой из моих «приоритетных» тем, но не было никакого способа проверить, был ли экземпляр KStream или KafkaStreams, связанный с темой с более высоким приоритетом, в настоящее время бездействующимили обработка.

Я основал свой код на этом файле

Другое

Я также попробовал код здесь: priority-kafka-client , но он не работал должным образом, поскольку запуск загруженного тестового файла имел смешанные приоритеты.

Я нашел этот поток , где один из разработчиков говорит (адрес добавления приоритетов для тем): «... пользователь может реализовать это поведение с паузой и возобновлением».Но я не смог выяснить, как он имел в виду, что это может работать.

Я нашел эту статью StackOverflow, но, похоже, они используют очень старую версию, и я не знал, как ихФункция отображения должна была работать.

Заключение

Я был бы очень признателен, если бы кто-то сказал мне, если они думают, что это что-то стоящее.Если это не так, как предполагается, что Apache Kafka работает, потому что он нарушает преимущества, полученные от автоматической обработки тем / разделов, это нормально, и я посмотрю в другом месте.Однако было так много случаев, когда люди, казалось, имели успех с этим, что я хотел попробовать.Спасибо.

1 Ответ

2 голосов
/ 27 марта 2019

Это звучит как проблема разработки в вашем приложении - kafka изначально разрабатывался как журнал фиксации, в котором каждое сообщение записывается брокеру со смещением, а различные потребители используют их в том порядке, в котором они были зафиксированы с очень низкой задержкой ивысокая пропускная способностьУчитывая, что разделы, а не темы, являются основной единицей распределения работы в Kafka, было бы трудно достичь приоритетов на уровне темы.

Я бы рекомендовал адаптировать свой дизайн для использования других архитектурных компонентов, чем Kafka, вместо того, чтобы пытатьсяподрезать ноги, чтобы вписаться в обувь.Одна вещь, которую вы уже можете сделать, это позволить вашему производителю загрузить файл в надлежащее хранилище файлов и отправить ссылку через Kafka, включая метаданные.Затем, в зависимости от состояния полосы пропускания, ваш потребитель может на основе метаданных большого файла принять решение о целесообразности загрузки или нет.Таким образом, вы, скорее всего, будете иметь более надежную конструкцию, чем неправильно использовать Kafka.

Если вы действительно хотите придерживаться только Kafka, одним из решений будет отправка больших файлов на некоторое фиксированное количество жестко закодированныхразделы и потребители потребляют эти разделы только при хорошей пропускной способности.

...