Я выполнил DAG в apache воздушном потоке на моем локальном ноутбуке. Хотя группа DAG выполняется успешно, она не создает никаких файлов, в то время как тот же код, выполненный в python, создает файл.
Пожалуйста, помогите с тем же.
Вот журналы задач
*** Reading local file: /usr/local/airflow/logs/process_sales_dag/pull_file/2020-08-01T12:51:53.299030+00:00/1.log
[2020-08-01 12:51:58,571] {{models.py:1361}} INFO - Dependencies all met for <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [queued]>
[2020-08-01 12:51:58,608] {{models.py:1361}} INFO - Dependencies all met for <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [queued]>
[2020-08-01 12:51:58,608] {{models.py:1573}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2020-08-01 12:51:58,678] {{models.py:1595}} INFO - Executing <Task(PythonOperator): pull_file> on 2020-08-01T12:51:53.299030+00:00
[2020-08-01 12:51:58,678] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run process_sales_dag pull_file 2020-08-01T12:51:53.299030+00:00 --job_id 136 --raw -sd DAGS_FOLDER/datacamp_test_dag_python.py --cfg_path /tmp/tmpxnxazonk']
[2020-08-01 12:51:59,136] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:51:59,136] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2020-08-01 12:51:59,396] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:51:59,395] {{__init__.py:51}} INFO - Using executor LocalExecutor
[2020-08-01 12:52:01,398] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:52:01,397] {{models.py:271}} INFO - Filling up the DagBag from /usr/local/airflow/dags/datacamp_test_dag_python.py
[2020-08-01 12:52:01,522] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:52:01,521] {{cli.py:484}} INFO - Running <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [running]> on host 8b3414552e19
[2020-08-01 12:52:03,480] {{logging_mixin.py:95}} INFO - File pulled from https://tableconvert.com/?output=csv and saved to D:\Airflow\iris.csv
[2020-08-01 12:52:03,480] {{python_operator.py:96}} INFO - Done. Returned value was: None
[2020-08-01 12:52:09,308] {{logging_mixin.py:95}} INFO - [2020-08-01 12:52:09,307] {{jobs.py:2627}} INFO - Task exited with return code 0
Python функция: -
def pull_file(URL, savepath):
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
# Use the print method for logging
print(f"File pulled from {URL} and saved to {savepath}")
Вот весь DAG воздушного потока: -
# Import the BashOperator
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
import requests
default_args = {
'start_date': airflow.utils.dates.days_ago(2)
}
process_sales_dag = DAG(
'process_sales_dag', default_args=default_args,
schedule_interval="@daily")
def pull_file(URL, savepath):
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
# Use the print method for logging
print(f"File pulled from {URL} and saved to {savepath}")
# Create the task
pull_file_task = PythonOperator(
task_id='pull_file',
# Add the callable
python_callable=pull_file,
# Define the arguments
op_kwargs={'URL':'https://tableconvert.com/?output=csv', 'savepath':'D:\\Airflow\\iris.csv'},
dag=process_sales_dag
)
pull_file_task