Мне нужно создать несколько групп DAG или одну группу DAG и несколько задач? - PullRequest
0 голосов
/ 06 мая 2020

Я изо всех сил пытаюсь понять, нужно ли мне создавать несколько групп DAG или просто несколько задач для того, что я делаю. Весь процесс запускается каждые несколько часов или запускается вручную. Вот суть:

  1. L oop через набор тестов
  2. Запустить тест.
  3. Результаты проверки
    • Если тесты пройдены, переходите к следующему набору тестов.
    • Если тесты терпят неудачу, запишите информацию об ошибке (время, причину, какой тест был запущен) в базе данных.
  4. Рассылка сообщений о сбоях.

Сейчас у меня то же самое, но я использую Celery Beats, чтобы периодически запускать все это, и я хотел бы перейти на Airflow.

Вот часть кода У меня пока есть:

def check_test_status(**kwargs):
    try:
        test_case = kwargs.pop('test_case', None)
        url = [ line.strip('\r\n') for line in open(os.getenv('TEST_SITES','test_sites.txt'),'r')]
        test_user = test_case['username']
        test_pass = test_case['password']
        test_ip = test_case['ip']
        test_port = test_case['http test port']
        proxies = {
                   'https':'http://{0}:{1}@{2}:{3}'.format(test_user, test_pass, test_ip, test_port),
                   'http':'http://{0}:{1}@{2}:{3}'.format(test_user, test_pass, test_ip, test_port)
                  }
        test_session = requests.Session()
        response = test_session.get(url, headers=headers, proxies=proxies, stream=True, timeout=8, allow_redirects=False)
        ret_code = response.status_code
        if response.ok or response.status_code in app.conf['ACCEPTED_CODES']: 
            return True, ret_code
        else: 
            return False, ret_code
    except Exception as exc:
        import traceback
        test_checker_logger.error('[x] Error {0}'.format(traceback.format_exc()))
        return str(exc)


def already_failed(**kwargs):
    test_id = kwargs.pop('test_id', None)
    result = test_conn.find_one('test_down', {'test_id':test_id})
    test_checker_logger.info(result)
    if result:
        if incr_down_count(test_id=test['_id'], client=client) == 2:
    else:
        update_test_status(status='false', client=client, test_id=test['_id'])

args = {
    'owner':'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id = 'checker_python_operator',
    default_args = args,
    description = 'Test suite testing  workflow Python DAG',
    schedule_interval = None
)

test_conn = MongoHook(conn_id='test_selector_mongo')
test_list = test_conn.find('test_info', None).limit(1)
for count, test_case in enumerate(test_list):
    check_status_task = PythonOperator(
                     task_id='check_test_status_'+str(count),
                     python_callable=check_test_status,
                     op_kwargs={'test_case':test_case},
                     provide_context=True,
                     dag=dag)
    already_failed_task = PythonOperator(
                          task_id='already_failed_' + str(count),
                          op_kwargs={'task_id':'check_test_stats_'+str(count)},
                          python_callable=already_failed,
                          provide_context=True,
                          dag=dag)

   check_status_task>>already_failed_task  

Нужно ли мне создавать несколько групп DAG или просто несколько задач для того, что я делаю? Я полностью упустил из виду это?

1 Ответ

3 голосов
/ 06 мая 2020

Я бы создал для этого один DAG и просто связал бы тесты вместе. Если тест не пройден, отметьте задачу как неудачную; стандартная конфигурация предназначена для того, чтобы зависимые задачи не запускались.

Когда вы генерируете DAG динамически, вам нужно быть уверенным, что все ваши задачи, однажды добавленные в DAG, останутся навсегда. Если тесты можно удалить, вы, вероятно, захотите создать для них фиктивную задачу. Из Рекомендации документации :

Удаление задачи

Никогда не удаляйте задачу из группы доступности базы данных. В случае удаления историческая информация о задаче исчезает из пользовательского интерфейса Airflow. Рекомендуется создать новую группу DAG на случай, если задачи необходимо удалить.

Это означает, что если тест удален и для теста не создается задача, то вы больше не можете видеть как этот удаленный тест проводился в прошлом. Если вместо этого вы поместили фиктивную задачу на их место, вы все равно сможете увидеть, как тест выполнялся в прошлом до его удаления.

Принимая ваши требования:

  1. L oop через набор тестов
  2. Запустить тест.
  3. Результаты проверки
    • Если тесты пройдены, переходите к следующему набору тестов.
    • Если тесты терпят неудачу, записать информацию об ошибке (время, причина, какой тест был запущен) в базе данных.
  4. Отправьте сообщение о сбоях.

Шаг 1. генерируете ли вы DAG из вашего источника PyMon go (заботясь о замене удалил тест с пустышками, так что отслеживайте где-нибудь, какие задачи вы создали)

Шаг 2. запускает тест в задаче.

Шаг 3 - та же задача проверка результатов; если задача не удалась, установите состояние экземпляра задачи как сбой, вызвав исключение AirflowException. Подойдет почти любое исключение, но AirFlowException("reason") немного чище.

Шаг 4 - это задача, которая зависит от всех тестовых задач и выполняется безоговорочно (независимо от результатов теста), а также создает и отправляет электронное письмо как последний шаг в группе DAG. Вы хотите установить его trigger_rule значение на all_done, чтобы выполнить sh this; он будет запущен после того, как все задачи, от которых он зависит, завершились успешно, не прошли или были пропущены.

Обратите внимание, что задачи будут повторяться, если вы дадите им retries значение больше 0; для ваших тестов вы, вероятно, захотите установить это значение в 0.

Итак, чтобы сгенерировать DAG, вы должны использовать:

test_tasks = []
last_test = None

for count, test_case in enumerate(test_list):
    check_status_task = PythonOperator(
        task_id='check_test_status_'+str(count),
        python_callable=check_test_status,
        op_kwargs={'test_case': test_case},
        provide_context=True,
        retries=0,  # no retrying when it fails
        dag=dag,
    )

    if test_tasks:
        # add task to pipeline defined by all tasks running in series
        check_status_task << test_tasks[-1]
    test_tasks.append(check_status_task)

email_result_task = PythonOperator(
    task_id='email_result',
    python_callable=email_result,
    provide_context=True,
    # run when all parent tasks are done, regardless the outcome
    trigger_rule='all_done',
    dag=dag
)
test_tasks >> email_result_task

Приведенный выше код строит граф зависимостей, беря каждый предыдущий test (test_tasks[-1]) в качестве восходящей задачи для только что сгенерированной в l oop. Почтовая задача зависит от всего списка тестовых задач.

Обратите внимание, что, поскольку вы устанавливаете provide_context=True и предоставляете словарь op_kwargs, вы можете просто предоставить своим операторам прямые аргументы для любого конкретного c контекста или op_kwargs имен, к которым вам нужен доступ в вашей функции. Тогда check_test_status() становится:

from airflow import AirflowException

def check_test_status(test_case, **kwargs):
    # run the test, get the result
    ...

    if test_failed:
        # log failure in database
        ...
        # then mark this task as failed
        raise AirflowException("Test <testcase> failed: ...")

Строка raise AirflowException() приводит к тому, что эта тестовая задача помечается как неудачная, а все другие тестовые задачи в вашей группе DAG не будут выполняться сейчас, потому что они используют правило триггера по умолчанию trigger_rule='all_success'.

Если вместо этого вы используете return в своей функции PythonOperator, то возвращаемое значение просто отбрасывается, если вы не используете xcom_push=True в задаче, после чего он помещается в специальную переменную XCom для этой задачи, airfow.models.XCOM_RETURN_KEY. Лично я предпочитаю использовать специальные переменные Xcom, особенно когда вам также нужно помечать задачи как неуспешные, вызывая исключения.

Итак, для последней задачи электронной почты вы можете использовать переменную XCom для передачи информации из любого тестовые задания, которые выполнялись и далее; все, что можно мариновать или сериализовать в JSON (в зависимости от конфигурации Airflow), можно передать таким образом.

XCOM_TEST_STATUS_KEY = "check_test_status"

def check_test_status(test_case, ti, **kwargs):
    # run the test, get the result
    ...

    test_status = {
        # ... information to share with the mail task            
    }

    ti.xcom_push(XCOM_TEST_STATUS_KEY, test_status)

    if test_failed:
        # log failure in database
        ...
        # then mark this task as failed
        raise AirflowException("Test <testcase> failed: ...")

, а затем в задаче электронной почты:

def email_result(dag, ti, **kwargs):
    test_tasks = [id for id in dag.task_ids if id.starts_with('check_test_status_')]
    test_results = ti.xcom_pull(
        task_ids=test_tasks,
        key=XCOM_TEST_STATUS_KEY,
    )
    # process test_results to produce an email
...