У меня есть конвейер DataFlow, пытающийся построить индекс (пары ключ-значение) и вычислить некоторые метрики (например, количество значений на ключ). Входные данные составляют около 60 ГБ и хранятся в GCS, а в конвейере выделено около 126 рабочих. На каждого стэкдрайвера загрузка всех процессоров составляет около 6%.
Трубопровод, похоже, не продвигается, несмотря на то, что в нем 126 работников, и, исходя из времени стены, узкое место кажется простым шагом подсчета, который следует за группой. В то время как на все остальные этапы уходит в среднем менее 1 часа, на этап подсчета ушло уже 50 дней. Кажется, нет никакой полезной информации обо всех предупреждениях в журнале.
Шаг подсчета был реализован после соответствующего шага в примере WordCount:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
Предыдущий шаг "Ключевые слова группы" представляет собой простое преобразование beam.GroupByKey ().
Пожалуйста, сообщите, в чем может быть причина и как это можно оптимизировать.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
Шаги конвейера, включая подсчет, можно увидеть ниже: