Ну, я не уверен на 100%, что это возможно, но вы можете:
- Определить файл
requirements.txt
со всеми зависимостями для выполнения конвейера - Избегатьимпорт и использование ваших зависимостей во время построения конвейера, только во временном коде выполнения.
Так, например, ваш файл может выглядеть так:
import apache_beam as beam
with beam.Pipeline(...) as p:
result = (p | ReadSomeData(...)
| beam.ParDo(MyForbiddenDependencyDoFn()))
И вВ этом же файле ваш DoFn будет импортировать вашу зависимость из кода времени выполнения конвейера, например, метод process
.См .:
class MyForbiddenDependencyDoFn(beam.DoFn):
def process(self, element):
import forbidden_dependency as fd
yield fd.totally_cool_operation(element)
Когда вы выполняете свой конвейер, вы можете сделать:
python your_pipeline.py \
--project $GCP_PROJECT \
--runner DataflowRunner \
--temp_location $GCS_LOCATION/temp \
--requirements_file=requirements.txt
Я никогда не пробовал это, но это может просто сработать:)