Как можно запустить параллельный поток Apache автоматически Если я запускаю DAG вручную - PullRequest
0 голосов
/ 28 ноября 2018

Я установил несколько настроек воздушного потока, чтобы выполнять параллельные задачи в воздушном потоке.Однако я подумал, что рабочий сельдерея должен запустить эту программу, когда я запускаю задачу с запуском dag в пользовательском интерфейсе airflow, который не запланировал, что он запускает только одну задачу за раз, и мне нужно щелкнуть по запуску многих задач, если я хочу достичь параллелизма.Мой dag выглядит состоит из основного dag и subdag, которые запускают сканер на основе списка карт из основных dag.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from tasks_utils.data_utils import get_uncrawled_lists,get_small_crawl_lists
from tasks_utils.subdags import crawler_retry_subdag
from datetime import datetime, timedelta
import pprint
import time
import pendulum
from airflow.operators.subdag_operator import SubDagOperator

local_tz = pendulum.timezone("Asia/Seoul")
thisYear = datetime.now().year
thisMonth = datetime.now().month
thisDate = datetime.now().day

default_args = {
    'owner': 'fidel',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['fidel@crunchprice.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retry_delay': timedelta(seconds=5),
    'wait_for_downstream':True,
    'provide_context':True

}

dag = DAG('cralwer_retry_tasks', default_args=default_args,schedule_interval=None)


t1 = BashOperator(
    task_id='print_date_cralwer_retry_tasks',
    bash_command='date',
    dag=dag
    )


uncrawled_op = PythonOperator(
    task_id='get_unuploaded_retry_lists',
    python_callable=get_uncrawled_lists,
    dag=dag
)

crawling_and_upload = SubDagOperator(
    task_id='crawl_and_upload_retries',
    subdag=crawler_retry_subdag('cralwer_retry_tasks', 'crawl_and_upload_retries', default_args,dag),
    default_args=default_args,
    dag=dag,
    depends_on_past=True
)
uncrawled_op.set_upstream(t1)
crawling_and_upload.set_upstream(uncrawled_op)

Вот как выглядит мой подкадр секунд.

def crawler_retry_subdag(parent_dag_name, child_dag_name, args,parent_dag):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        concurrency=10
    )
    if len(parent_dag.get_active_runs()) > 0 :
        retry_dag =  parent_dag.get_task_instances(
            settings.Session,
            start_date=parent_dag.get_active_runs()[-1]
        )
        if retry_dag:
            cardList = retry_dag[-1].xcom_pull(
            dag_id=parent_dag_name,
            task_ids='get_unuploaded_retry_lists',
            key="return_cards"
            )
            if cardList:
                for card in cardList:
                    card['isRetry'] = True 
                    site_lists_tasks = PythonOperator(
                        task_id='site_list_' + card['companyId'],
                        python_callable=get_crawled_links,
                        op_kwargs={
                            'company_site': card['companySite'],
                            'parent_dag_name':parent_dag_name,
                        },
                    dag=dag_subdag,
                    )
                    crawling_and_upload = SubDagOperator(
                        task_id='crawl_and_upload_retry'+card['companyId'],
                        subdag=crawler_sub_sub_dag(
                            parent_dag_name,
                            child_dag_name,
                            'crawl_and_upload_retry'+card['companyId'],
                            card,
                            args,
                            dag_subdag
                        ),
                        default_args=args,
                        dag=dag_subdag,
                        depends_on_past=True
                    )
                    crawling_and_upload.set_upstream(site_lists_tasks)
    return dag_subdag
...