как вызвать задачу из функции в потоке воздуха - PullRequest
0 голосов
/ 06 декабря 2018
def perPracticeFileToCDRJob(**context):
    #t1 = BashOperator(task_id='task',xcom_push=True,bash_command="aws s3 ls s3://bucket/test_csv_output/ | wc -l", dag=dag)
    val=context['task_instance'].xcom_pull(task_ids='practiceCount_task')

    #vaule=os.system("aws s3 ls s3://bucket/test_csv_output/ | wc -l")
    #print('****val=$val*****')
    print(val)
    for i in val:
            print(i)
            subprocess.getstatusoutput(fileToCDRConfigUpdate)
            EmrAddStepsOperator(
            task_id='spark_submit_csvProcessor',
           job_flow_id=clusterId,
           aws_conn_id=awsConnId,
           steps=SPARK_TEST_STEPS_CSVPROCESSOR,
          dag=dag
...