Я пытаюсь создать полноценный даг. Я должен использовать SparkSubmitOperator, но я не знаю все параметры конфигурации до выполнения. Я получаю часть конфигурации из json, из hdfs, и некоторую дополнительную часть из rest, из ** kwargs ['dag_run']. Conf.
Я пытался использовать разные глобальные переменные и Xcom, но, похоже, это не работает для SparkSubmitOperator.
import subprocess
from script import neoflex_config_handler
from config import config
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
# context['dag_run'].conf['local_path']
# '{{dag_run.conf.hdfs_path}}'
# '{{dag_run.conf.java_class}}'
dag = DAG('ShapovalovTransformation1',
description='Transformation-test',
schedule_interval=None,
start_date=datetime(2017, 3, 20))
config_path = config.DevelopmentConfig.BASE_DATAGRAMM_DEPLOYMENTS_PATH + dag.dag_id
config_name = '{0}.json'.format(dag.dag_id)
_config = config_handler.get_config_hdfs(config_path, config_name)
# _config = config_handler.add_additional_application_args(_config, additional_args)
# _config = "{{ ti.xcom_pull(task_ids=get_config) }}"
def push_config(**kwargs):
additional_args = kwargs['dag_run'].conf
print('Additional args: {0}'.format(additional_args))
config_path_2 = config.DevelopmentConfig.BASE_DATAGRAMM_DEPLOYMENTS_PATH + dag.dag_id
print('Config path 2: {0}'.format(config_path_2))
config_name_2 = '{0}.json'.format(dag.dag_id)
print('Config name 2: {0}'.format(config_name_2))
_config_2 = config_handler.get_config_hdfs(config_path_2, config_name_2)
print('Config from hdfs 2: {0}'.format(_config_2))
_config_2 = config_handler.add_additional_application_args(_config_2, additional_args)
print('Config full, hdfs+rest 2: {0}'.format(_config_2))
return _config_2
def run_cmd_send_event(**kwargs):
print('Running system command: {0}'.format('echo'))
proc = subprocess.Popen(['echo', 'task completed'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s_output, s_err = proc.communicate()
s_return = proc.returncode
return s_return, s_output, s_err
get_config = PythonOperator(task_id='get_config',
provide_context=True,
python_callable=push_config,
dag=dag)
run_etl = SparkSubmitOperator(task_id='run_etl',
dag=dag,
**_config)
send_event = PythonOperator(task_id='send_event',
dag=dag,
provide_context=True,
python_callable=run_cmd_send_event)
get_config >> run_etl >> send_event
Пример FullConfig:
app_args = [
'MASTER=yarn',
'SLIDE_SIZE=400',
'FETCH_SIZE=1000',
'PARTITION_NUM=1',
'FAIL_THRESHOLD=1000',
'bootstrap=devhdp1:6667,devhdp2:6667,devhdp3:6667',
'topics=load',
'destination=/bdp/tmp/Channel1/version=version1',
'startingOffsets={"load":{"0":1002}}',
'endingOffsets={"load":{"0":1004}}',
'USER=airflow',
'HOME=\/user/airflow/',
'ROOT_WORKFLOW_ID='+dag_name,
'CURRENT_WORKFLOW_ID='+dag_name]
_config = {
'java_class': '2StepsJob',
'master': 'yarn',
'deploy-mode': 'client',
'driver-memory': '3G',
'application': 'hdfs://devhdp-nn1.local-counter.tns:8020/bdp/etl-jars/hdfs/deployments/2Steps/1.0-SNAPSHOT.jar',
'application_args': app_args
}
Что мне нужно:
После того, как я получу параметры от json и Rest (кроме того, мне нужно получить некоторые параметры от zookeeper), я хочу создать полную версию ** _ config и передать ее SparkSubmitOperator.