Файлы не создаются в Apache Airflow - PullRequest
0 голосов
/ 01 августа 2020

Я выполнил DAG в apache воздушном потоке на моем локальном ноутбуке. Хотя группа DAG выполняется успешно, она не создает никаких файлов, в то время как тот же код, выполненный в python, создает файл.

Пожалуйста, помогите с тем же.

Вот журналы задач

*** Reading local file: /usr/local/airflow/logs/process_sales_dag/pull_file/2020-08-01T12:51:53.299030+00:00/1.log
[2020-08-01 12:51:58,571] {{models.py:1361}} INFO - Dependencies all met for <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [queued]>
[2020-08-01 12:51:58,608] {{models.py:1361}} INFO - Dependencies all met for <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [queued]>
[2020-08-01 12:51:58,608] {{models.py:1573}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2020-08-01 12:51:58,678] {{models.py:1595}} INFO - Executing <Task(PythonOperator): pull_file> on 2020-08-01T12:51:53.299030+00:00
[2020-08-01 12:51:58,678] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run process_sales_dag pull_file 2020-08-01T12:51:53.299030+00:00 --job_id 136 --raw -sd DAGS_FOLDER/datacamp_test_dag_python.py --cfg_path /tmp/tmpxnxazonk']
[2020-08-01 12:51:59,136] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:51:59,136] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2020-08-01 12:51:59,396] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:51:59,395] {{__init__.py:51}} INFO - Using executor LocalExecutor
[2020-08-01 12:52:01,398] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:52:01,397] {{models.py:271}} INFO - Filling up the DagBag from /usr/local/airflow/dags/datacamp_test_dag_python.py
[2020-08-01 12:52:01,522] {{base_task_runner.py:101}} INFO - Job 136: Subtask pull_file [2020-08-01 12:52:01,521] {{cli.py:484}} INFO - Running <TaskInstance: process_sales_dag.pull_file 2020-08-01T12:51:53.299030+00:00 [running]> on host 8b3414552e19
[2020-08-01 12:52:03,480] {{logging_mixin.py:95}} INFO - File pulled from https://tableconvert.com/?output=csv and saved to D:\Airflow\iris.csv
[2020-08-01 12:52:03,480] {{python_operator.py:96}} INFO - Done. Returned value was: None
[2020-08-01 12:52:09,308] {{logging_mixin.py:95}} INFO - [2020-08-01 12:52:09,307] {{jobs.py:2627}} INFO - Task exited with return code 0

Python функция: -

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}") 

Вот весь DAG воздушного потока: -

# Import the BashOperator
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
import requests

default_args = {

    'start_date': airflow.utils.dates.days_ago(2)

}

process_sales_dag = DAG(
    'process_sales_dag', default_args=default_args,
    schedule_interval="@daily")

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'https://tableconvert.com/?output=csv', 'savepath':'D:\\Airflow\\iris.csv'},
    dag=process_sales_dag
)


pull_file_task
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...