SideInput I / O убивает производительность - PullRequest
0 голосов
/ 24 сентября 2019

Я строю конвейер потока данных, используя Python SDK 2.15.0.В этом конвейере мне нужно присоединить дополнительные данные к каждому элементу на нескольких этапах моего конвейера.

Все эти дополнительные данные считываются из файлов avro в облачном хранилище Google (та же зона, используемая как для потока данных, так и для корзины GCS).), организованный как кортежи значения ключа с помощью функции map, а затем переданный в качестве бокового ввода в DoFn с помощью pvalue.AsDict ().Сторонние входные данные не изменятся во время выполнения конвейера.

Первое соединение (размер бокового ввода ~ 1 МБ) проходит действительно хорошо.Однако второе объединение действительно страдает от плохой работы.Его боковой вход составляет около 50 МБ.

График выполнения потока данных ясно показывает, что является причиной плохой производительности: примерно 90% времени, затрачиваемого на мой шаг ParDo, тратится на чтение бокового ввода.Объем данных, считанных из sideinput, на порядок превышает его фактический размер, хотя я использую только четыре рабочих узла.

Могу ли я что-нибудь сделать, чтобы предотвратить это?Нужно ли как-то настраивать размер рабочего кэша?Было бы лучше подготовить дополнительные данные в методе настройки моего DoFn вместо того, чтобы передавать их как sideinput?

Вот как я готовлю боковые входы:

sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
                              | "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))

# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
                              | "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))

Использование боковых входов:


matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
                 | "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')                                                                                

result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )

Метод процесса DoFn просто содержит поиск ключа в боковом вводе и, основываясь на наличии совпадения, добавляет некоторые дополнительные данные к элементу.

...