Воздушный поток DAG не привязан к Dag-Bag: Задачи Dynami c - PullRequest
0 голосов
/ 21 апреля 2020

Последние два дня я работал над созданием новой группы обеспечения доступности баз данных, которая генерирует задачу динамически на основе списка, сгенерированного во время выполнения. Мне удалось создать DAG и синхронизировать c там, где сгенерированные задачи были построены из списка в модуле Airflow, где хранится DAG. После этого я изменил список с одного, хранящегося в модуле воздушного потока, на список, который будет сгенерирован во время выполнения при получении текстового файла с сервера sftp.

Мне удалось успешно синхронизировать группу DAG c, когда я использовал список в приведенном ниже коде, но теперь, когда я изменил его для динамического построения из run_main(), группа DAG не будет син c.

Будет редактировать этот вопрос в течение дня, чтобы попытаться уточнить больше. Просто все еще довольно плохо знаком с Airflow, так что я знаю, что здесь я не очень ясен.

import boto3
import os
import pkg_resources
import math
import sys
from airflow import DAG
from airflow.contrib.sensors.sftp_sensor import SFTPSensor
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, date
from warp_pipelines.dsg import data_transfer, sftp_file_rename
from warp_pipelines.utils import utils
from warp.api_wrappers import ftp

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 8),
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(dag_id='test_rev_transfer',
        default_args=args,
        schedule_interval="0 10 * * *",
        catchup=False
        )

#build last file path
folder_path= '/test_sftp/'
file_date= "{{ execution_date.strftime('%y%m%d') }}"
full_path= '{}360i_{}.txt'.format(folder_path, file_date)

#return item to build out load dsg_data tasks
def run_main():
    """

    """
    folder_path= '/test_sftp/'
    file_date= "{{ execution_date.strftime('%y%m%d') }}"
    full_path= '{}test_{}.txt'.format(folder_path, file_date)
    sa = utils.load_yaml(pkg_resources.resource_filename('warp_pipelines',
                                                         'test/sftp_rev.yaml')
                       )
    #build last file path
    sftp_conn= ftp.conn(host= sa['host'],
                        user= sa['user'],
                        con_type='sftp',
                        password= sa['pwd']
                        )

    sftp= ftp.FTPWrap(sftp_conn)

    try:
        rev_df= sftp.get_df(full_path,
                            delimiter='\t')
    except:
        sys.exit()

    bool_df = rev_df[rev_df['Orders'] > 0]

    rt = math.ceil((bool_df.shape[0]/10000))
    task_list = []
    for i in range(0,rt):
        if i == 0:
            value1 = 0
        else:
            value1 = i*10000
        value2=(i*10000)+10000
        task_list.append([value1, value2])

    return task_list


#custom function for sns
def run_aws_sns_topic():
     connection = BaseHook.get_connection("aws_global")
     aws_access_key_id = connection.login
     aws_secret_access_key = connection.password

     client = boto3.client('sns',
                      region_name='us-east-1',
                      aws_access_key_id=aws_access_key_id,
                      aws_secret_access_key=aws_secret_access_key
                      )
     response = client.publish(
     TopicArn='arn:aws:sns:us-east-1:288219908733:test_revenue_transfer',
     Message='Data load success.',
     Subject='test_data_load {}'.format((date.today()-timedelta(days=1)).strftime('%B %d %Y'))
     )


sftp_sensor= SFTPSensor(
    task_id= 'test_rev_sensor',
    path=full_path,
    sftp_conn_id= 'test_rev_conn',
    poke_interval= 600,
    mode= 'reschedule',
    dag=dag
)


sftp1 = PythonOperator(
    task_id = 'sftp_file_move',
    python_callable=sftp_file_rename.run_main,
    dag=dag,
    provide_context=True
)


sns1 = PythonOperator(task_id='sns_success_send',
                      python_callable=run_aws_sns_topic,
                      dag=dag)


task_create = run_main()


for slice_list in task_create:
    s1 = PythonOperator(
        task_id = 'load_test_data_{}_{}'.format(slice_list[0], slice_list[1]),
        python_callable=data_transfer.run_main,
        dag=dag,
        provide_context=True,
        op_kwargs= {'slice': slice_list}
        )
    sftp_sensor >> s1 >> sftp1 >> sns1

...