Вы бы действительно использовали xcom
. Для этой цели объект task_instance
предоставляет два нескольких метода: xcom_push
и xcom_pull
.
Чтобы pu sh значение xcom
, вам необходимо предоставить контекст для вашего "python сопоставимая "функция. Это сделает объект task_instance
доступным для функции.
Поле environment
для DockerOperator
является шаблоном. Это означает, что он поддерживает макросы . Вы можете использовать макрос {{ task_instance }}
.
def make_folder(folder_path:str, date:str, **context):
download_path= folder_path + date
os.mkdir(download_path)
task_instance = context['task_instance']
task_instance.xcom_push(key="download_path", value=download_path)
task_1 = PythonOperator(
task_id="make_folder",
provide_context=True,
python_callable=make_folder,
op_kwargs={'folder_path': '/my_path_', 'date':'str(datetime.date(datetime.today()))'}
)
task_2 = DockerOperator(
task_id='docker',
image='file_processor:latest',
api_version='auto',
auto_remove=False,
command='',
environment={
'DPATH': "{{ task_instance.xcom_pull(task_ids='make_folder', key='download_path') }}"
},
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
task_1 >> task_2