Я пытаюсь реализовать способ произвольного доступа к сообщениям от Kafka, используя KafkaConsumer.assign (раздел), KafkaConsumer.seek (раздел, смещение).
А затем прочитайте опрос для одного сообщения.
В этом случае я не могу получить 500 сообщений в секунду. Для сравнения, если я «подписываюсь» на раздел, я получаю 100 000+ мс / сек. (Размер сообщения 1000 байтов)
Я пробовал:
- Брокер, Zookeeper, Потребитель на одном хосте и на разных хостах. (репликация не используется)
- 1 и 15 перегородок
- конфигурация потоков по умолчанию в "server.properties" и увеличена до 20 (io и сеть)
- Один потребитель назначается разному разделу каждый раз и одному потребителю на раздел
- Один поток для потребления и несколько потоков для потребления (вызов нескольких различных потребителей)
- Добавление двух брокеров и новая тема с разделами на обоих брокерах
- Запуск нескольких процессов Kafka Consumer
- Изменение размеров сообщений 5k, 50k, 100k -
Во всех случаях минимум, который я получаю, составляет ~ 200 мсг / с. И максимум 500, если я использую 2-3 темы. Но, если перейти выше, вызов ".poll ()" будет продолжаться дольше и дольше (начиная с 3-4 мс в одном потоке до 40-50 мс с 10 потоками).
Мое наивное понимание кафки заключается в том, что потребитель открывает соединение с брокером и отправляет запрос на получение небольшой части его журнала. Хотя все это имеет некоторую задержку, и получение пакета сообщений будет намного лучше - я мог бы предположить, что оно будет масштабироваться в зависимости от количества задействованных приемников, за счет увеличения использования сервера как на виртуальной машине, на которой работают потребители, так и ВМ работает с брокером. Но оба они бездельничают.
Так что, по-видимому, на стороне брокера происходит некоторая синхронизация, но я не могу понять, связано ли это с моим использованием Kafka или каким-то внутренним ограничением использования .seek
Я бы оценил некоторые намеки на то, должен ли я попробовать что-то еще, или это все, что я могу получить.