Apache - Airflow 1.10.1 не начать работу - PullRequest
0 голосов
/ 09 декабря 2018

У меня проблема с Airflow. Первое задание в группе обеспечения доступности баз данных всегда успешно запускается и заканчивается, но второе никогда не запускается автоматически.

Я пытаюсь очистить задание в пользовательском интерфейсе, но оно не запускается, если я хочу, чтобы он работал, мне нужно удалить запущенные задания в базе данных,

delete from job where state='running'

Но у меня не так много заданий в рабочем состоянии, у меня есть только 1 задание SchedulerJob с последнимСердцебиение в порядке, и 16 внешних датчиков задач ожидают этого DAG

В пуле 150 слотов и 16 запущенных и 1 запланированных.

  • У меня работает планировщик воздушного потока
  • У меня работает веб-сервер airflow
  • Все веб-группы обеспечения доступности баз данных в веб-интерфейсе имеют значение Вкл.
  • У всех групп обеспечения доступности баз данных есть дата начала, которая находится в прошлом
  • Iсбросить планировщик за несколько часов до

А это код в потоке воздуха

default_args = {
  'owner': 'airgia',
  'depends_on_past': False,
  'retries': 2,
  'start_date': datetime(2018, 12, 1, 0, 0),
  'email': ['xxxx@yyyy.net'],
  'email_on_failure': False,
  'email_on_retry': False
}

dag = DAG('trigger_snapshot',
      default_args=default_args,
      dagrun_timeout= timedelta(hours=22),
      schedule_interval="0 0 * * 1,2,3,4,5,7",
      max_active_runs=1,
      catchup=False
   )

set_exec_dt = PythonOperator(
    task_id='set_exec_dt',
    python_callable=set_exec_dt_variable,
    dag=dag,
    pool='capser')

lanza_crawler = PythonOperator(
    task_id='lanza_crawler',
    op_kwargs={"crawler_name": crawler_name},
    python_callable=start_crawler,
    dag=dag,
    pool='capser')

copy_as_processed =  PythonOperator(
    task_id='copy_as_processed',
    op_kwargs={"source_bucket": Variable.get("bucket"),
           "source_key": snapshot_key,
           "dest_bucket": Variable.get("bucket"),
           "dest_key": "{0}_processed".format(snapshot_key)},
    python_callable=s3move,
    dag=dag,
    pool='capser')

airflow_snapshot = S3KeySensor(
    task_id='airflow_snapshot',
    bucket_key=snapshot_key,
    wildcard_match=True,
    bucket_name=Variable.get("bucket"),
    timeout=8*60*60,
    poke_interval=120,
    dag=dag,
    pool='capser')


Fin_DAG_TC = DummyOperator(
    task_id='Fin_DAG_TC',
    dag=dag,
pool='capser')


airflow_snapshot >> lanza_crawler >> set_exec_dt >> copy_as_processed >> Fin_DAG_TC

И это то, что я вижу, когда я подключаюсь к веб-интерфейсу каждое утро

оператор ноль


[EDIT]

Это последний журнал для планировщика

Здесь мы можем увидеть вызов для второго задания (lanza_crawler), но не запуск.

[2018-12-11 03: 50: 54,209] {{jobs.py:1109}} INFO - Задачи для выполнения:

[2018-12-11 03: 50: 54,240] {{jobs.py:1180}} ИНФОРМАЦИЯ - DAG trigger_snapshot имеет 0/16 выполняющихся и поставленных в очередь задач

[2018-12-11 03: 50: 54,240] {{jobs.py:1218}} INFO- Установка следующих задач в состояние очереди:

[2018-12-11 03: 50: 54,254] ​​{{jobs.py:1301}} INFO - Установка следующих задач в очередьсостояние:

[2018-12-11 03: 50: 54,255] {{jobs.py:1343}} INFO - отправка ('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo =), 1) исполнителю с приоритетом 4 и очередью по умолчанию

[2018-12-11 03: 50: 54,255] {{base_executor.py:56}} INFO - Добавление в очередь: запуск воздушного потока trigger_snapshot lanza_crawler 2018-12-10T00: 00: 00 + 00: 00 --local --pool capser -sd / usr / local / airflow / dags / capser / trigger_snapshot.py

[2018-12-11 03: 50: 54,262] {{celery_executor.py:83}} INFO - организация очереди [celery] ('trigger_snapshot', 'lanza_crawler', datetime.datetime (2018, 12, 10, 0, 0, tzinfo =), 1) через сельдерей, очередь = по умолчанию

[2018-12-11 03: 50: 54,749] {{jobs.py: 1447}} INFO - Исполнитель сообщает, что trigger_snapshot.airflow_snapshot execute_date = 2018-12-10 00: 00: 00 + 00: 00 как успешный для try_number 1

/ usr / local / airflow / dags / capser / trigger_snapshot.py 1,53 с 2018-12-11T03: 50: 54

...

/ usr / local / airflow / dags / capser / trigger_snapshot.py 6866 0,68 с 1,54 с 2018-12-11T03: 56: 50

и это последний журнал для работника

[2018-12-11 03: 50: 52,718: INFO / ForkPoolWorker-11]Задача airflow.executors.celery_executor.execute_command [9a2e1ae7-9264-47d8-85ff-cac32a542708] успешно выполнена в 13847.525094523095s: отсутствует

[2018-12-11 03: 50: 54,505: задача INFO / MainProcess] получитьairflow.executors.celery_executor.execute_command [9ff70fc8-45ef-4751-b274-71e242553128]

[2018-12-11 03: 50: 54,983] {{settings.py:174}} INFO - setting.configure_orm (): использование настроек пула,pool_size = 5, pool_recycle = 1800

[2018-12-11 03: 50: 55,422] {{_ _init__.py:51}} INFO - Использование исполнителя CeleryExecutor

[2018-12-11 03: 50: 55,611] {{models.py:271}} ИНФОРМАЦИЯ - Заполнение DagBag из /usr/local/airflow/dags/capser/DAG_AURORA/DAG_AURORA.py

[2018-12-11 03: 50: 55,970] {{cli.py:484}} INFO - Запуск на хосте ip- - - - * .eu-west-1.compute.internal

1 Ответ

0 голосов
/ 15 декабря 2018

В графике aws мы увидели, что 80% рабочей памяти занято, и мы решили увеличить количество работников, и проблема была решена.

...