Вот как выглядит код:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 9, 1),
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@daily',
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
dag = DAG('scraper', schedule_interval=None,
default_args=default_args)
t1 = BashOperator(
task_id = 'scrape',
dag = dag,
bash_command = 'python E:/dag/Car_Scraper.py')
# setting dependencies
dag >> t1
Это структура каталогов: E: \ dag \
Car_Scraper.py находится в той же папке. Но я все еще получаю эту ошибку:
*** Reading local file: /home/fatima/airflow/logs/scraper/scrape/2020-04-09T13:36:29.132535+00:00/1.log
[2020-04-09 18:37:26,538] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: scraper.scrape 2020-04-09T13:36:29.132535+00:00 [queued]>
[2020-04-09 18:37:26,557] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: scraper.scrape 2020-04-09T13:36:29.132535+00:00 [queued]>
[2020-04-09 18:37:26,558] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-04-09 18:37:26,558] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2020-04-09 18:37:26,559] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-04-09 18:37:26,580] {taskinstance.py:887} INFO - Executing <Task(BashOperator): scrape> on 2020-04-09T13:36:29.132535+00:00
[2020-04-09 18:37:26,610] {standard_task_runner.py:53} INFO - Started process 6740 to run task
[2020-04-09 18:37:26,748] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: scraper.scrape 2020-04-09T13:36:29.132535+00:00 [running]> LHRLT-2364.Systemsltd.local
[2020-04-09 18:37:26,775] {bash_operator.py:82} INFO - Tmp dir root location:
/tmp
[2020-04-09 18:37:26,782] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmpFZ5Gy3/scrapeMm2Ten
[2020-04-09 18:37:26,782] {bash_operator.py:115} INFO - Running command: python E:/dag/Car_Scraper.py
[2020-04-09 18:37:26,819] {bash_operator.py:122} INFO - Output:
[2020-04-09 18:37:26,855] {bash_operator.py:126} INFO - python: can't open file 'E:/dag/Car_Scraper.py': [Errno 2] No such file or directory
[2020-04-09 18:37:26,855] {bash_operator.py:130} INFO - Command exited with return code 2
[2020-04-09 18:37:26,872] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2020-04-09 18:37:26,876] {taskinstance.py:1151} INFO - Marking task as UP_FOR_RETRY
[2020-04-09 18:37:36,504] {logging_mixin.py:112} INFO - [2020-04-09 18:37:36,502] {local_task_job.py:103} INFO - Task exited with return code 1
Есть ли другой способ запустить его? Я сталкивался с оператором python, но не знаю, как его использовать
Редактировать: Я поместил его в отдельный каталог и немного изменил код:
# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 9, 1),
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@daily',
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
dag = DAG('scraper', schedule_interval=None,
default_args=default_args)
templated_command = """
cd ..\\scripts\\
python Car_Scraper.py
"""
download = BashOperator(
task_id='download_release',
bash_command=templated_command,
dag=dag)
# setting dependencies
dag >> download
Но это дает следующие результаты:
*** Reading local file: /home/fatima/airflow/logs/scraper/download_release/2020-04-09T14:56:45.487450+00:00/2.log
[2020-04-09 19:58:24,958] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: scraper.download_release 2020-04-09T14:56:45.487450+00:00 [queued]>
[2020-04-09 19:58:24,966] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: scraper.download_release 2020-04-09T14:56:45.487450+00:00 [queued]>
[2020-04-09 19:58:24,966] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-04-09 19:58:24,966] {taskinstance.py:867} INFO - Starting attempt 2 of 2
[2020-04-09 19:58:24,966] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-04-09 19:58:24,977] {taskinstance.py:887} INFO - Executing <Task(BashOperator): download_release> on 2020-04-09T14:56:45.487450+00:00
[2020-04-09 19:58:24,991] {standard_task_runner.py:53} INFO - Started process 10874 to run task
[2020-04-09 19:58:25,073] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: scraper.download_release 2020-04-09T14:56:45.487450+00:00 [running]> LHRLT-2364.Systemsltd.local
[2020-04-09 19:58:25,085] {bash_operator.py:82} INFO - Tmp dir root location:
/tmp
[2020-04-09 19:58:25,088] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmpupGsj5/download_releaseTNOufD
[2020-04-09 19:58:25,088] {bash_operator.py:115} INFO - Running command:
cd ..\scripts\
python hello.py
[2020-04-09 19:58:25,105] {bash_operator.py:122} INFO - Output:
[2020-04-09 19:58:25,109] {bash_operator.py:126} INFO - /tmp/airflowtmpupGsj5/download_releaseTNOufD: line 3: cd: too many arguments
[2020-04-09 19:58:25,109] {bash_operator.py:130} INFO - Command exited with return code 1
[2020-04-09 19:58:25,119] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2020-04-09 19:58:25,120] {taskinstance.py:1170} INFO - All retries failed; marking task as FAILED.dag_id=scraper, task_id=download_release, execution_date=20200409T145645, start_date=20200409T145824, end_date=20200409T145825
[2020-04-09 19:58:34,941] {logging_mixin.py:112} INFO - [2020-04-09 19:58:34,940] {local_task_job.py:103} INFO - Task exited with return code 1