Как передать ** конфигурационный файл в SparkSubmitOperator от другого оператора - PullRequest
0 голосов

Я пытаюсь создать полноценный даг. Я должен использовать SparkSubmitOperator, но я не знаю все параметры конфигурации до выполнения. Я получаю часть конфигурации из json, из hdfs, и некоторую дополнительную часть из rest, из ** kwargs ['dag_run']. Conf.

Я пытался использовать разные глобальные переменные и Xcom, но, похоже, это не работает для SparkSubmitOperator.

import subprocess
from script import neoflex_config_handler
from config import config
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

# context['dag_run'].conf['local_path']
# '{{dag_run.conf.hdfs_path}}'
# '{{dag_run.conf.java_class}}'

dag = DAG('ShapovalovTransformation1',
          description='Transformation-test',
          schedule_interval=None,
          start_date=datetime(2017, 3, 20))

config_path = config.DevelopmentConfig.BASE_DATAGRAMM_DEPLOYMENTS_PATH + dag.dag_id
config_name = '{0}.json'.format(dag.dag_id)
_config = config_handler.get_config_hdfs(config_path, config_name)


# _config = config_handler.add_additional_application_args(_config, additional_args)
# _config = "{{ ti.xcom_pull(task_ids=get_config) }}"

def push_config(**kwargs):
    additional_args = kwargs['dag_run'].conf
    print('Additional args: {0}'.format(additional_args))
    config_path_2 = config.DevelopmentConfig.BASE_DATAGRAMM_DEPLOYMENTS_PATH + dag.dag_id
    print('Config path 2: {0}'.format(config_path_2))
    config_name_2 = '{0}.json'.format(dag.dag_id)
    print('Config name 2: {0}'.format(config_name_2))
    _config_2 = config_handler.get_config_hdfs(config_path_2, config_name_2)
    print('Config from hdfs 2: {0}'.format(_config_2))
    _config_2 = config_handler.add_additional_application_args(_config_2, additional_args)
    print('Config full, hdfs+rest 2: {0}'.format(_config_2))
    return _config_2


def run_cmd_send_event(**kwargs):
    print('Running system command: {0}'.format('echo'))
    proc = subprocess.Popen(['echo', 'task completed'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return = proc.returncode
    return s_return, s_output, s_err


get_config = PythonOperator(task_id='get_config',
                            provide_context=True,
                            python_callable=push_config,
                            dag=dag)

run_etl = SparkSubmitOperator(task_id='run_etl',
                              dag=dag,
                              **_config)

send_event = PythonOperator(task_id='send_event',
                            dag=dag,
                            provide_context=True,
                            python_callable=run_cmd_send_event)

get_config >> run_etl >> send_event

Пример FullConfig:

app_args = [
    'MASTER=yarn',
    'SLIDE_SIZE=400',
    'FETCH_SIZE=1000',
    'PARTITION_NUM=1',
    'FAIL_THRESHOLD=1000',
    'bootstrap=devhdp1:6667,devhdp2:6667,devhdp3:6667',
    'topics=load',
    'destination=/bdp/tmp/Channel1/version=version1',
    'startingOffsets={"load":{"0":1002}}',
    'endingOffsets={"load":{"0":1004}}',
    'USER=airflow',
    'HOME=\/user/airflow/',
    'ROOT_WORKFLOW_ID='+dag_name,
    'CURRENT_WORKFLOW_ID='+dag_name]

_config = {
    'java_class': '2StepsJob',
    'master': 'yarn',
    'deploy-mode': 'client',
    'driver-memory': '3G',
    'application': 'hdfs://devhdp-nn1.local-counter.tns:8020/bdp/etl-jars/hdfs/deployments/2Steps/1.0-SNAPSHOT.jar',
    'application_args': app_args
}

Что мне нужно: После того, как я получу параметры от json и Rest (кроме того, мне нужно получить некоторые параметры от zookeeper), я хочу создать полную версию ** _ config и передать ее SparkSubmitOperator.

1 Ответ

0 голосов

Я решаю это. Только что написал свой собственный оператор spark-submit

...