Потоковое приложение Kafka не масштабируется с большим количеством процессорных ядер и потоков на одной машине - PullRequest
0 голосов
/ 02 ноября 2018

Я пишу приложение kafka-streams, которое в основном извлекает ключи 2 типов из записей avro и считает их в указанных окнах. Он должен обрабатывать ~ 6 тыс. Событий в секунду.

Проблема, с которой я столкнулся:

  • один c4.8xlarge экземпляр с потоками num.stream.threads = 20 (количество разделов в теме ввода) потребляет всего ~ 2,5 тыс. Событий в секунду
  • тот же экземпляр с num.stream.threads = 10 потоками потребляет события с той же скоростью
  • четыре c4.2xlarge экземпляра с num.stream.threads = 5 потребляют до 10-25 тыс. Событий в секунду

Я никогда не видел, чтобы загрузка процессора была выше 70%. Сеть его тоже недоиспользуется.

Вот мой конфиг потоков:

kafka.streaming {  
  compression.type = "lz4"
  acks = 1
  retries = 1

  // I care about throughput more than about latency
  max.poll.records = 6000
  fetch.min.bytes = 3300000 // 6000 * 550 (average record size)
  fetch.max.wait.ms = 1000 // we get 6000 records in 1 second
  batch.size = 165000 // (6000 / 20) * 550
  linger.ms = 1000
}

Брокерская версия: 0.10.2.1

Версия потоков Кафки: 1.1.1

Это кажется удивительным, потому что я думал, что могу масштабировать обработку kafka линейно, если имеется достаточно разделов, независимо от того, где расположены потребители, на одной машине или на нескольких.

Многие экземпляры EC2 могут решить проблему масштабируемости, но я хотел бы запустить свое приложение на одном, потому что агрегаты должны быть представлены через интерактивные запросы, и я не хочу разрабатывать уровень RPC.

UPD: определение потока

signalStream
  .map[EventDetailsGroup, java.lang.Short]((_, v) => new KeyValue(extractEventDetailsGroup(v), Short.box(1)))
  .groupByKey(Serialized.`with`(eventDetailsSerde, Serdes.Short()))   
  .windowedBy(TimeWindows.of(30 * 60 * 1000).advanceBy(60 * 1000))   
  .count(Materialized.as("store-name").withCachingDisabled().withLoggingDisabled())
...