РЕДАКТИРОВАТЬ
В случае, если кто-то еще находится в этой конкретной ситуации, я получил что-то похожее на то, что я искал после настройки пользовательских конфигураций.Я создал производителя, который отправлял сообщения о приоритетах в три отдельные темы (для высоких / средних / низких приоритетов), а затем я создал 3 отдельных потребителя для потребления из каждой.Затем я часто опрашивал темы с более высоким приоритетом и не опрашивал более низкие приоритеты, если только высокий не был пустым:
while(true) {
final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);
final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
if (!consumerRecordsHigh.isEmpty()) {
//process high pri records
} else {
final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
if (!consumerRecordsMed.isEmpty()) {
//process med pri records
Время ожидания опроса (аргумент метода .poll()
)определяет, как долго ждать, если нет записей для опроса.Я установил это на очень короткое время для каждой темы, но вы можете установить его ниже для более низких приоритетов, чтобы убедиться, что он не потребляет ценные циклы ожидания, когда имеются сообщения с высоким pri
max.poll.records
config, очевидно, определяет максимальное количество записей, которые можно получить в одном опросе.Это может быть установлено выше и для более высоких приоритетов.
Конфигурация max.poll.interval.ms
определяет время между опросами - сколько времени должно занимать обработка сообщений max.poll.records
.Пояснение здесь .
Кроме того, я считаю, что приостановка / возобновление работы целого потребителя / темы может быть реализована следующим образом:
kafkaConsumer.pause(kafkaConsumer.assignment())
if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
Я не уверен, что это лучший способ, но я не смог найти хороший пример в другом месте
Я согласен с Сэнсэйу ниже, что это не такдействительно правильное использование для Кафки.Это однопотоковая обработка, каждая тема имеет отдельного потребителя, но я буду работать над улучшением этого процесса.
Фон
Мы пытаемся улучшить наше приложение и надеемся использовать Apache Kafka для обмена сообщениями между отделенными компонентами.Наша система часто имеет низкую пропускную способность (хотя бывают случаи, когда пропускная способность может быть высокой в течение некоторого времени) и имеет небольшие высокоприоритетные сообщения, которые должны обрабатываться, пока большие файлы ждут, или обрабатываются медленно, чтобы использовать меньшую пропускную способность.Мы хотели бы иметь темы с разными приоритетами.
Я новичок в Kafka, но безуспешно пытался изучить как Processor API, так и Kafka Streams, хотя некоторые сообщения на форумах, кажется, говорят, что это выполнимо.
API процессора
Когда я попробовал Processor API
, я попытался определить, обрабатывает ли высокий приоритет KafkaConsumer
что-либо в данный момент, проверив, если poll()
был пуст, а затем надеялся на poll()
с Med Priority Consumer, но второй опрос темы вернулся пустым.Также не было простого способа получить все TopicPartition
по теме для вызова kafkaConsumer.pause(partitions)
.
Kafka Streams
Когда я попытался KafkaStreams
, я настроил поток для потребления из каждой из моих «приоритетных» тем, но не было никакого способа проверить, был ли экземпляр KStream
или KafkaStreams
, связанный с темой с более высоким приоритетом, в настоящее время бездействующимили обработка.
Я основал свой код на этом файле
Другое
Я также попробовал код здесь: priority-kafka-client , но он не работал должным образом, поскольку запуск загруженного тестового файла имел смешанные приоритеты.
Я нашел этот поток , где один из разработчиков говорит (адрес добавления приоритетов для тем): «... пользователь может реализовать это поведение с паузой и возобновлением».Но я не смог выяснить, как он имел в виду, что это может работать.
Я нашел эту статью StackOverflow, но, похоже, они используют очень старую версию, и я не знал, как ихФункция отображения должна была работать.
Заключение
Я был бы очень признателен, если бы кто-то сказал мне, если они думают, что это что-то стоящее.Если это не так, как предполагается, что Apache Kafka работает, потому что он нарушает преимущества, полученные от автоматической обработки тем / разделов, это нормально, и я посмотрю в другом месте.Однако было так много случаев, когда люди, казалось, имели успех с этим, что я хотел попробовать.Спасибо.