Мы выполняем рабочий процесс Dataflow, который использует Kafka и записывает мгновенные файлы avro в gcs, используя API записи Apache Beam AvroIO. Мы подготовили максимум 13 рабочих, которые должны обрабатывать 50 000 запросов в секунду для входящих событий. Мы используем LogAppendTime для сообщений kafka. Размер каждой записи похож на другой. Окно 1 час со следующим триггером:
Repeatedly
.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(50000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))
)
)
.orFinally(AfterWatermark.pastEndOfWindow())
События производятся для Кафки со скоростью 30 тыс. Футов в секунду. Через некоторое время рабочие смогли справиться только со скоростью 25 тыс. Запросов в секунду, отставание водяного знака от данных увеличилось примерно на 2 часа. Поэтому мы обновили рабочий процесс, предполагая, что это решит проблему. После обновления в течение первого часа он смог обработать 45 тыс. Запросов в секунду, как и ожидалось, и отметка уровня данных уменьшалась. После этого загрузка ЦП большинства рабочих снизилась, а количество операций в секунду упало до 20 КБ. Это привело к увеличению лага водяных знаков в данных, поскольку события передавались в Kafka со скоростью 30 тыс. Кадров в секунду.
В ходе дальнейших исследований мы обнаружили, что большая часть загрузки ЦП работника является низкой и колеблется от 15% до 20%, у 2 из них загрузка ЦП составляет 40%, а у одной из них загрузка ЦП составляет 60%. Из журналов видно, что те, у кого загрузка ЦП выше, пишут в gcs чаще, чем те, которые загружают ЦП. Мы установили numShards на 26, предполагая, что они будут равномерно распределены среди рабочих. Однако похоже, что поток данных назначает большинство из них одним и тем же работникам.
Подробности рабочего процесса:
job_id: 2018-05-08_10_39_57-9264166384462032078
numShards: 26
maxNumWorkers: 13