Мы все относительно новички здесь, в Airflow, поэтому этот вопрос немного смутил всех нас.В рамках этой конкретной DAG наш Инженер установил DAG для обратной засыпки.Передача контекста в ETC и т. Д. В настоящее время мы видим два оператора в журнале с очень разными датами выполнения (см. Ниже).Также обратите внимание, что я сократил журналы, поэтому я не просто вставляю в них все.
Я также хотел включить DAG, так как он существует в нашем экземпляре Airflow.Все выглядит правильно, и это отражает то, что мы сделали в прошлом.
Проверка кода как для DAG, так и для сценария, который используется для извлечения данных из Google.Все выглядит хорошо для меня.Я довольно новичок, когда дело доходит до этих вещей, поэтому я мог бы что-то здесь упустить.
Журнал для скрипта
*** Reading local file: /usr/local/airflow/logs/kroger/create_log_dir/2018-03-04T07:00:00+00:00/1.log
[2019-03-29 20:37:26,562] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: kroger.create_log_dir 2018-03-04T07:00:00+00:00 [queued]>
[2019-03-29 20:37:26,566] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: kroger.create_log_dir 2018-03-04T07:00:00+00:00 [queued]>
[2019-03-29 20:37:26,566] {{models.py:1571}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2019-03-29 20:37:26,577] {{models.py:1593}} INFO - Executing <Task(PythonOperator): create_log_dir> on 2018-03-04T07:00:00+00:00
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2019-04-09 01:04:21,054] {{models.py:1593}} INFO - Executing <Task(PythonOperator): create_log_dir> on 2018-03-04T07:00:00+00:00
[2019-04-09 01:04:21,054] {{base_task_runner.py:118}} INFO - Running:
[2019-04-09 01:04:26,031] {{logging_mixin.py:95}} INFO - [2019-04-09 01:04:26,030] {{jobs.py:2527}} INFO - Task exited with return code 0
DAG в том виде, в каком он сейчас стоит
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
# 'email': [''],
# 'email_on_failure': True,
# 'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=15),
# 'schedule_interval':'0 10 * * *',
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2019, 4, 12),
}
dag = DAG(dag_id='kroger',
default_args=args,
schedule_interval="0 7 * * *",
catchup=True)
t0 = PythonOperator(task_id='create_log_dir',
python_callable=utils.create_account_dirs,
op_args=['kroger'],
dag=dag)
# on_failure_callback=send_notification)
t1 = PythonOperator(task_id = 'build_ds_table',
python_callable=build_schema.create_schema,
provide_context=True,
dag=dag)
# on_failure_callback=send_notification)
t2 = PythonOperator(task_id = 'ds',
python_callable= ds.upload_ds_report,
provide_context=True,
dag=dag)
t0 >> t1
t1 >> t2
В настоящее время все выглядит так, как ожидается, в отношении данных, которые мыпри получении.Я только что просматривал ЛОГИ и несколько операторов на логи, вид меня немного беспокоил.Пожалуйста, дайте мне знать, если есть какая-либо дополнительная информация, которую я мог бы предоставить здесь, чтобы сделать этот вопрос лучше для вас.