Задачи воздушного потока зависали при повторной попытке на 30 минут, а затем успешно выполнялись - PullRequest
0 голосов
/ 21 января 2020

Я относительно новичок в использовании Airflow. Я уже прочитал десятки сообщений и не могу найти рабочее решение. Я был бы очень признателен, если бы кто-нибудь помог мне направить меня в правильном направлении.

У меня есть задание, которое нужно запускать каждую минуту (код ниже). У него есть 4 задачи: переместить файлы в AWS S3, выполнить команду SQL в базу данных Impala, выполнить вторую команду SQL, а затем переместить файлы в S3 в другое место.

В конечном итоге все задачи выполняются успешно, но только после того, как они находились в состоянии ожидания повтора в течение 30 минут для каждой задачи. Учитывая, что это должно выполняться раз в минуту, это, очевидно, не сработает.

Я не могу найти ничего полезного в журналах, с чем мне приходится работать:

Task is not ready for retry yet but will be retried automatically. Current date is 2020-01-20T20:22:39.926112+00:00 and task will be retried at 2020-01-20T20:23:27.234433+00:00.

журнал для одной из задач выглядит следующим образом:

--------------------------------------------------------------------------------
[2020-01-20 19:19:15,760] {taskinstance.py:842} INFO - Starting attempt 1 of 2
[2020-01-20 19:19:15,760] {taskinstance.py:843} INFO -
--------------------------------------------------------------------------------
[2020-01-20 19:19:15,773] {taskinstance.py:862} INFO - Executing <Task(PythonOperator): invalidate_metadata> on 2020-01-20T19:11:00+00:00
[2020-01-20 19:19:15,773] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'p_weather_home_commercial_v1_ingest', 'invalidate_metadata', '2020-01-20T19:11:00+00:00', '--job_id', '64', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/weather_home-commercial_v1-ingest.py', '--cfg_path', '/tmp/tmp2i9g9v59']
[2020-01-20 19:19:16,914] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:16,913] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=11729
[2020-01-20 19:19:17,633] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,632] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-01-20 19:19:17,634] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,634] {dagbag.py:92} INFO - Filling up the DagBag from /root/airflow/dags/weather_home-commercial_v1-ingest.py
[2020-01-20 19:19:17,812] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,812] {credentials.py:1032} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2020-01-20 19:19:17,829] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,829] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): secretsmanager.us-west-2.amazonaws.com
[2020-01-20 19:19:18,092] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:18,091] {cli.py:545} INFO - Running <TaskInstance: p_weather_home_commercial_v1_ingest.invalidate_metadata 2020-01-20T19:11:00+00:00 [running]> on host ip-10-190-8-162.us-west-2.compute.internal
[2020-01-20 19:19:18,137] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=p_weather_home_commercial_v1_ingest
AIRFLOW_CTX_TASK_ID=invalidate_metadata
AIRFLOW_CTX_EXECUTION_DATE=2020-01-20T19:11:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-01-20T19:11:00+00:00
[2020-01-20 19:19:18,156] {logging_mixin.py:112} INFO - [2020-01-20 19:19:18,156] {base_hook.py:84} INFO - Using connection to: id: Impala-prod. Host: jdbc:impala://internal-impala-internal-100737082.us-west-2.elb.amazonaws.com:21050/default;transportMode=sasl;AuthMech=3;SSL=1;AllowSelfSignedCerts=1;CAIssuedCertNamesMismatch=1;, Port: None, Schema: None, Login: cdh6prod.airflow, Password: XXXXXXXX, extra: {'extra__jdbc__drv_path': '/opt/airflow/var/lib/ImpalaJDBC4.jar', 'extra__jdbc__drv_clsname': 'com.cloudera.impala.jdbc4.Driver', 'extra__google_cloud_platform__project': '', 'extra__google_cloud_platform__key_path': '', 'extra__google_cloud_platform__keyfile_dict': '', 'extra__google_cloud_platform__scope': '', 'extra__google_cloud_platform__num_retries': None, 'extra__grpc__auth_type': '', 'extra__grpc__credentials_pem_file': '', 'extra__grpc__scopes': ''}
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN No appenders could be found for logger (com.cloudera.impala.jdbc4.internal.apache.thrift.transport.TSaslTransport).
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN Please initialize the log4j system properly.
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2020-01-20 19:19:19,293] {logging_mixin.py:112} INFO - [2020-01-20 19:19:19,292] {dbapi_hook.py:171} INFO - INVALIDATE METADATA weather_home.commercial_v1_loading;
[2020-01-20 19:19:19,497] {python_operator.py:114} INFO - Done. Returned value was: Ran Query '{}' against prod impala
[2020-01-20 19:19:20,700] {logging_mixin.py:112} INFO - [2020-01-20 19:19:20,700] {local_task_job.py:124} WARNING - Time since last heartbeat(0.02 s) < heartrate(5.0 s), sleeping for 4.980271 s
[2020-01-20 19:19:25,685] {logging_mixin.py:112} INFO - [2020-01-20 19:19:25,684] {local_task_job.py:103} INFO - Task exited with return code 0

Но все задачи возвращаются одинаково, показывая, что они завершаются с кодом выхода 0.

Дагы генерируются динамически с помощью скрипта, который запрашивает базу данных метаданных, которую мы разработали дома. У нас есть пара сотен таких, чтобы их можно было автоматизировать.

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import airflow.hooks.S3_hook 
from airflow.hooks.jdbc_hook import JdbcHook
from pymongo import MongoClient
import json
import boto3
import base64
from botocore.exceptions import ClientError
import sys


####### config ##################################################################################################

namespace = "dish.iot.housefly"
name = "Weather_Home"
version = 1.0

####### get the schema ##########################################################################################

def get_secret(secret):

    secret_name = secret
    region_name = "us-west-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
    else:
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return(secret)
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return(decoded_binary_secret)



sb_creds = json.loads(get_secret("*******"))


client = MongoClient("mongodb://******:"+ sb_creds["Pass"] +"@************.us-west-2.docdb.amazonaws.com:*****/?ssl=true&ssl_ca_certs=/root/airflow/rds-combined-ca-bundle.pem&replicaSet=rs0")

db = client["Schema_Store"]

dbcollection = db[namespace]

schema = dbcollection.find_one({"Namespace":namespace,"Name":name,"Version":version})

###### Airflow ################################################################################################

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 31),
    'email': ['********'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('p_{}_{}_ingest'.format(schema['Database'],schema['Table']), default_args=default_args, catchup=False, max_active_runs=1, schedule_interval=schema['Cron'])
################################################################################################################

def executeSQL(sqlString):
   impHook = JdbcHook(jdbc_conn_id="Impala-prod")
   impHook.run(sqlString)
   return("Ran Query '{}' against prod impala")


#########################################################################################

t1 = BashOperator(
    task_id="staging_to_loading",
    bash_command="aws s3 mv s3://{0}{1}/staging s3://{0}{1}/loading --recursive".format(schema['S3_bucket'],schema['S3_key']),
    dag=dag
    )

t2 = PythonOperator(
        task_id="invalidate_metadata",
        python_callable=executeSQL,
        op_kwargs={
                   'sqlString':'INVALIDATE METADATA {}.{}_loading;'.format(schema['Database'],schema['Table']),
                  },
       dag=dag,
    )

t3 = PythonOperator(
        task_id="load_data_into_impala",
        python_callable=executeSQL,
        op_kwargs={
                   'sqlString':'INSERT INTO {0}.{1}_today PARTITION (tenantid, applicationid, dt) SELECT * FROM {0}.{1}_loading;'.format(schema['Database'],schema['Table']),
                  },
        dag=dag,
    )

t4 = BashOperator(
    task_id="loading_to_processed",
    bash_command="aws s3 mv s3://{0}{1}/loading s3://{0}{1}/processed --recursive".format(schema['S3_bucket'],schema['S3_key']),
    dag=dag
    )

t1 >> t2 >> t3 >> t4

1 Ответ

0 голосов
/ 21 января 2020

Я думаю, что обтекание Python операторов в try кроме (с traceback.print_ex c () в исключении) поможет получить больше информации

Кстати, когда я начал изучать Airflow, я использовал этот инструмент для игры с пользовательским интерфейсом. Может быть, вы найдете это полезным:

http://choose.tools/tool?id=Airflow&utm_source=59844912&utm_medium=airflow&utm_campaign=so

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...