У меня проблема с Airflow. Первое задание в группе обеспечения доступности баз данных всегда успешно запускается и заканчивается, но второе никогда не запускается автоматически.
Я пытаюсь очистить задание в пользовательском интерфейсе, но оно не запускается, если я хочу, чтобы он работал, мне нужно удалить запущенные задания в базе данных,
delete from job where state='running'
Но у меня не так много заданий в рабочем состоянии, у меня есть только 1 задание SchedulerJob с последнимСердцебиение в порядке, и 16 внешних датчиков задач ожидают этого DAG
В пуле 150 слотов и 16 запущенных и 1 запланированных.
- У меня работает планировщик воздушного потока
- У меня работает веб-сервер airflow
- Все веб-группы обеспечения доступности баз данных в веб-интерфейсе имеют значение Вкл.
- У всех групп обеспечения доступности баз данных есть дата начала, которая находится в прошлом
- Iсбросить планировщик за несколько часов до
А это код в потоке воздуха
default_args = {
'owner': 'airgia',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['xxxx@yyyy.net'],
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG('trigger_snapshot',
default_args=default_args,
dagrun_timeout= timedelta(hours=22),
schedule_interval="0 0 * * 1,2,3,4,5,7",
max_active_runs=1,
catchup=False
)
set_exec_dt = PythonOperator(
task_id='set_exec_dt',
python_callable=set_exec_dt_variable,
dag=dag,
pool='capser')
lanza_crawler = PythonOperator(
task_id='lanza_crawler',
op_kwargs={"crawler_name": crawler_name},
python_callable=start_crawler,
dag=dag,
pool='capser')
copy_as_processed = PythonOperator(
task_id='copy_as_processed',
op_kwargs={"source_bucket": Variable.get("bucket"),
"source_key": snapshot_key,
"dest_bucket": Variable.get("bucket"),
"dest_key": "{0}_processed".format(snapshot_key)},
python_callable=s3move,
dag=dag,
pool='capser')
airflow_snapshot = S3KeySensor(
task_id='airflow_snapshot',
bucket_key=snapshot_key,
wildcard_match=True,
bucket_name=Variable.get("bucket"),
timeout=8*60*60,
poke_interval=120,
dag=dag,
pool='capser')
Fin_DAG_TC = DummyOperator(
task_id='Fin_DAG_TC',
dag=dag,
pool='capser')
airflow_snapshot >> lanza_crawler >> set_exec_dt >> copy_as_processed >> Fin_DAG_TC
И это то, что я вижу, когда я подключаюсь к веб-интерфейсу каждое утро
оператор ноль
[EDIT]
Это последний журнал для планировщика
Здесь мы можем увидеть вызов для второго задания (lanza_crawler), но не запуск.
[2018-12-11 03: 50: 54,209] {{jobs.py:1109}} INFO - Задачи для выполнения:
[2018-12-11 03: 50: 54,240] {{jobs.py:1180}} ИНФОРМАЦИЯ - DAG trigger_snapshot имеет 0/16 выполняющихся и поставленных в очередь задач
[2018-12-11 03: 50: 54,240] {{jobs.py:1218}} INFO- Установка следующих задач в состояние очереди:
[2018-12-11 03: 50: 54,254] {{jobs.py:1301}} INFO - Установка следующих задач в очередьсостояние:
[2018-12-11 03: 50: 54,255] {{jobs.py:1343}} INFO - отправка ('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo =), 1) исполнителю с приоритетом 4 и очередью по умолчанию
[2018-12-11 03: 50: 54,255] {{base_executor.py:56}} INFO - Добавление в очередь: запуск воздушного потока trigger_snapshot lanza_crawler 2018-12-10T00: 00: 00 + 00: 00 --local --pool capser -sd / usr / local / airflow / dags / capser / trigger_snapshot.py
[2018-12-11 03: 50: 54,262] {{celery_executor.py:83}} INFO - организация очереди [celery] ('trigger_snapshot', 'lanza_crawler', datetime.datetime (2018, 12, 10, 0, 0, tzinfo =), 1) через сельдерей, очередь = по умолчанию
[2018-12-11 03: 50: 54,749] {{jobs.py: 1447}} INFO - Исполнитель сообщает, что trigger_snapshot.airflow_snapshot execute_date = 2018-12-10 00: 00: 00 + 00: 00 как успешный для try_number 1
/ usr / local / airflow / dags / capser / trigger_snapshot.py 1,53 с 2018-12-11T03: 50: 54
...
/ usr / local / airflow / dags / capser / trigger_snapshot.py 6866 0,68 с 1,54 с 2018-12-11T03: 56: 50
и это последний журнал для работника
[2018-12-11 03: 50: 52,718: INFO / ForkPoolWorker-11]Задача airflow.executors.celery_executor.execute_command [9a2e1ae7-9264-47d8-85ff-cac32a542708] успешно выполнена в 13847.525094523095s: отсутствует
[2018-12-11 03: 50: 54,505: задача INFO / MainProcess] получитьairflow.executors.celery_executor.execute_command [9ff70fc8-45ef-4751-b274-71e242553128]
[2018-12-11 03: 50: 54,983] {{settings.py:174}} INFO - setting.configure_orm (): использование настроек пула,pool_size = 5, pool_recycle = 1800
[2018-12-11 03: 50: 55,422] {{_ _init__.py:51}} INFO - Использование исполнителя CeleryExecutor
[2018-12-11 03: 50: 55,611] {{models.py:271}} ИНФОРМАЦИЯ - Заполнение DagBag из /usr/local/airflow/dags/capser/DAG_AURORA/DAG_AURORA.py
[2018-12-11 03: 50: 55,970] {{cli.py:484}} INFO - Запуск на хосте ip- - - - * .eu-west-1.compute.internal