Я пишу приложение kafka-streams, которое в основном извлекает ключи 2 типов из записей avro и считает их в указанных окнах. Он должен обрабатывать ~ 6 тыс. Событий в секунду.
Проблема, с которой я столкнулся:
- один
c4.8xlarge
экземпляр с потоками num.stream.threads = 20
(количество разделов в теме ввода) потребляет всего ~ 2,5 тыс. Событий в секунду
- тот же экземпляр с
num.stream.threads = 10
потоками потребляет события с той же скоростью
- четыре
c4.2xlarge
экземпляра с num.stream.threads = 5
потребляют до 10-25 тыс. Событий в секунду
Я никогда не видел, чтобы загрузка процессора была выше 70%. Сеть его тоже недоиспользуется.
Вот мой конфиг потоков:
kafka.streaming {
compression.type = "lz4"
acks = 1
retries = 1
// I care about throughput more than about latency
max.poll.records = 6000
fetch.min.bytes = 3300000 // 6000 * 550 (average record size)
fetch.max.wait.ms = 1000 // we get 6000 records in 1 second
batch.size = 165000 // (6000 / 20) * 550
linger.ms = 1000
}
Брокерская версия: 0.10.2.1
Версия потоков Кафки: 1.1.1
Это кажется удивительным, потому что я думал, что могу масштабировать обработку kafka линейно, если имеется достаточно разделов, независимо от того, где расположены потребители, на одной машине или на нескольких.
Многие экземпляры EC2 могут решить проблему масштабируемости, но я хотел бы запустить свое приложение на одном, потому что агрегаты должны быть представлены через интерактивные запросы, и я не хочу разрабатывать уровень RPC.
UPD: определение потока
signalStream
.map[EventDetailsGroup, java.lang.Short]((_, v) => new KeyValue(extractEventDetailsGroup(v), Short.box(1)))
.groupByKey(Serialized.`with`(eventDetailsSerde, Serdes.Short()))
.windowedBy(TimeWindows.of(30 * 60 * 1000).advanceBy(60 * 1000))
.count(Materialized.as("store-name").withCachingDisabled().withLoggingDisabled())