Я строю конвейер потока данных, используя Python SDK 2.15.0.В этом конвейере мне нужно присоединить дополнительные данные к каждому элементу на нескольких этапах моего конвейера.
Все эти дополнительные данные считываются из файлов avro в облачном хранилище Google (та же зона, используемая как для потока данных, так и для корзины GCS).), организованный как кортежи значения ключа с помощью функции map, а затем переданный в качестве бокового ввода в DoFn с помощью pvalue.AsDict ().Сторонние входные данные не изменятся во время выполнения конвейера.
Первое соединение (размер бокового ввода ~ 1 МБ) проходит действительно хорошо.Однако второе объединение действительно страдает от плохой работы.Его боковой вход составляет около 50 МБ.
График выполнения потока данных ясно показывает, что является причиной плохой производительности: примерно 90% времени, затрачиваемого на мой шаг ParDo, тратится на чтение бокового ввода.Объем данных, считанных из sideinput, на порядок превышает его фактический размер, хотя я использую только четыре рабочих узла.
Могу ли я что-нибудь сделать, чтобы предотвратить это?Нужно ли как-то настраивать размер рабочего кэша?Было бы лучше подготовить дополнительные данные в методе настройки моего DoFn вместо того, чтобы передавать их как sideinput?
Вот как я готовлю боковые входы:
sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
| "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))
# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
| "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))
Использование боковых входов:
matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
| "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')
result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )
Метод процесса DoFn просто содержит поиск ключа в боковом вводе и, основываясь на наличии совпадения, добавляет некоторые дополнительные данные к элементу.