apache airflow - Не удается загрузить пакет dag для обработки ошибки - PullRequest
0 голосов
/ 08 мая 2018

Я создал функцию on_failure_callback (ссылаясь на Воздушный поток по умолчанию on_failure_callback ) для обработки сбоя задачи.

Хорошо работает, когда в группе обеспечения доступности баз данных есть только одна задача, однако, если есть еще 2 задачи, случайная задача не выполняется , так как оператор имеет значение null , она может быть возобновлена ​​позже вручную. В airflow-scheduler.out журнал выглядит так:

[2018-05-08 14: 24: 21,237] {models.py:1595} ОШИБКА - Отчеты исполнителя Экземпляр задачи% s завершен (% s), хотя задача говорит о своем% s. Был задание убито внешне? NoneType [2018-05-08 14: 24: 21,238] {jobs.py:1435} ОШИБКА - Невозможно загрузить пакет dag, чтобы обработать ошибку для , Установка задачи на FAILED без обратные вызовы или повторные попытки. Достаточно ли у вас ресурсов?

Код DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
import airflow
from devops.util import WechatUtil
from devops.util import JiraUtil

def on_failure_callback(context):
    ti = context['task_instance']
    log_url = ti.log_url
    owner = ti.task.owner
    ti_str = str(context['task_instance'])
    wechat_msg = "%s - Owner:%s"%(ti_str,owner)
    WeChatUtil.notify_team(wechat_msg)

    jira_desc = "Please check log from url %s"%(log_url)
    JiraUtil.create_incident("DW",ti_str,jira_desc,owner)


args = {
    'queue': 'default',
    'start_date': airflow.utils.dates.days_ago(1),
    'retry_delay': timedelta(minutes=1),
    'on_failure_callback': on_failure_callback,
    'owner': 'user1',
    }
dag = DAG(dag_id='test_dependence1',default_args=args,schedule_interval='10 16 * * *')

load_crm_goods = BashOperator(
    task_id='crm_goods_job',
    bash_command='date',
    dag=dag)

load_crm_memeber = BashOperator(
    task_id='crm_member_job',
    bash_command='date',
    dag=dag)

load_crm_order = BashOperator(
    task_id='crm_order_job',
    bash_command='date',
    dag=dag)

load_crm_eur_invt = BashOperator(
    task_id='crm_eur_invt_job',
    bash_command='date',
    dag=dag)

crm_member_cohort_analysis = BashOperator(
    task_id='crm_member_cohort_analysis_job',
    bash_command='date',
    dag=dag)

crm_member_cohort_analysis.set_upstream(load_crm_goods)
crm_member_cohort_analysis.set_upstream(load_crm_memeber)
crm_member_cohort_analysis.set_upstream(load_crm_order)
crm_member_cohort_analysis.set_upstream(load_crm_eur_invt)

crm_member_kpi_daily = BashOperator(
    task_id='crm_member_kpi_daily_job',
    bash_command='date',
    dag=dag)

crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)

Я пытался обновить airflow.cfg, добавив память по умолчанию с 512 до даже 4096, но безуспешно. Есть ли у кого-нибудь совет?

Также попробуйте обновить мои JiraUtil и WechatUtil следующим образом, выдавая ту же ошибку

WechatUtil:

import requests

class WechatUtil:
    @staticmethod
    def notify_trendy_user(user_ldap_id, message):
        return None

    @staticmethod
    def notify_bigdata_team(message):
        return None

JiraUtil:

import json
import requests
class JiraUtil:
    @staticmethod
    def execute_jql(jql):
        return None

    @staticmethod
    def create_incident(projectKey, summary, desc, assignee=None):
        return None

1 Ответ

0 голосов
/ 07 июня 2018

(Я немного стреляю из трассирующих пуль, так что потерпите меня, если с первого раза этот ответ не получится).

Проблема нулевого оператора с несколькими экземплярами задачи странная ... это поможет приблизиться к устранению неполадок, если вы можете свести текущий код к MCVE , например, 1–2 операторам, исключая JiraUtil и Детали WechatUtil, если они не связаны с ошибкой обратного вызова.

Вот 2 идеи:

1. Можете ли вы попробовать изменить строку, извлекающую экземпляр задачи из контекста, чтобы увидеть, имеет ли это значение?

До:

def on_failure_callback(context):
    ti = context['task_instance']
    ...

После того, как:

def on_failure_callback(context):
    ti = context['ti']
    ...

Я видел это использование в репозитории Airflow (https://github.com/apache/incubator-airflow/blob/c1d583f91a0b4185f760a64acbeae86739479cdb/airflow/contrib/hooks/qubole_check_hook.py#L88). Возможно, к нему можно получить доступ обоими способами.

2. Можете ли вы попробовать добавить provide_context=True к операторам в виде kwarg или в default_args?

...