Я создал функцию on_failure_callback (ссылаясь на Воздушный поток по умолчанию on_failure_callback ) для обработки сбоя задачи.
Хорошо работает, когда в группе обеспечения доступности баз данных есть только одна задача, однако, если есть еще 2 задачи, случайная задача не выполняется , так как оператор имеет значение null , она может быть возобновлена позже вручную. В airflow-scheduler.out журнал выглядит так:
[2018-05-08 14: 24: 21,237] {models.py:1595} ОШИБКА - Отчеты исполнителя
Экземпляр задачи% s завершен (% s), хотя задача говорит о своем% s. Был
задание убито внешне? NoneType [2018-05-08 14: 24: 21,238]
{jobs.py:1435} ОШИБКА - Невозможно загрузить пакет dag, чтобы обработать ошибку для
, Установка задачи на FAILED без
обратные вызовы или повторные попытки. Достаточно ли у вас ресурсов?
Код DAG:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
import airflow
from devops.util import WechatUtil
from devops.util import JiraUtil
def on_failure_callback(context):
ti = context['task_instance']
log_url = ti.log_url
owner = ti.task.owner
ti_str = str(context['task_instance'])
wechat_msg = "%s - Owner:%s"%(ti_str,owner)
WeChatUtil.notify_team(wechat_msg)
jira_desc = "Please check log from url %s"%(log_url)
JiraUtil.create_incident("DW",ti_str,jira_desc,owner)
args = {
'queue': 'default',
'start_date': airflow.utils.dates.days_ago(1),
'retry_delay': timedelta(minutes=1),
'on_failure_callback': on_failure_callback,
'owner': 'user1',
}
dag = DAG(dag_id='test_dependence1',default_args=args,schedule_interval='10 16 * * *')
load_crm_goods = BashOperator(
task_id='crm_goods_job',
bash_command='date',
dag=dag)
load_crm_memeber = BashOperator(
task_id='crm_member_job',
bash_command='date',
dag=dag)
load_crm_order = BashOperator(
task_id='crm_order_job',
bash_command='date',
dag=dag)
load_crm_eur_invt = BashOperator(
task_id='crm_eur_invt_job',
bash_command='date',
dag=dag)
crm_member_cohort_analysis = BashOperator(
task_id='crm_member_cohort_analysis_job',
bash_command='date',
dag=dag)
crm_member_cohort_analysis.set_upstream(load_crm_goods)
crm_member_cohort_analysis.set_upstream(load_crm_memeber)
crm_member_cohort_analysis.set_upstream(load_crm_order)
crm_member_cohort_analysis.set_upstream(load_crm_eur_invt)
crm_member_kpi_daily = BashOperator(
task_id='crm_member_kpi_daily_job',
bash_command='date',
dag=dag)
crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)
Я пытался обновить airflow.cfg, добавив память по умолчанию с 512 до даже 4096, но безуспешно. Есть ли у кого-нибудь совет?
Также попробуйте обновить мои JiraUtil и WechatUtil следующим образом, выдавая ту же ошибку
WechatUtil:
import requests
class WechatUtil:
@staticmethod
def notify_trendy_user(user_ldap_id, message):
return None
@staticmethod
def notify_bigdata_team(message):
return None
JiraUtil:
import json
import requests
class JiraUtil:
@staticmethod
def execute_jql(jql):
return None
@staticmethod
def create_incident(projectKey, summary, desc, assignee=None):
return None