Apache AIRFLOW - Как отправить аргументы в Python Script - PullRequest
0 голосов
/ 12 июня 2018

В Admin->Connection Я установил Conn Type S3.

В основном у меня есть этот код в скрипте My Python:

if __name__ == '__main__':
    AWS_ACCESS_KEY_ID = "..."
    AWS_SECRET_ACCESS_KEY = "..."
    AWS_DEFAULT_REGION = "..."
    Start_Work

Что я хочу сделать, это вызвать мой скрипт из Airflowи передать ему аргументы для соединения (вместо жесткого кода их в скрипте).

Как мне это сделать?

Редактировать: предположим, что это соединение: enter image description here

Как получить доступ к каждой поданной информации?

Ответы [ 2 ]

0 голосов
/ 14 июня 2018

Я вижу, что у вас есть идентификатор соединения M_1, а тип соединения - S3, поэтому вы можете загрузить его внутри PythonOperator (или сценария python с именем BashOperator) с помощью:

from airflow.hooks.s3_hook import S3Hook

def py_op_callable:
    hook = S3Hook('M_1')
    botocore_credentials = hook.get_credentials()
    botocore_credentials.access_key
    botocore_credentials.secret_key
    botocore_credentials.token

На v1.9.0 кажется, что get_credentials еще нет.На AwsHook есть только приватный _get_credentials(), который наследует S3Hook.Если вы уверены, что вставили их в дополнительные параметры, прямой подход:

from airflow.hooks.base_hook import BaseHook

def py_op_callable:
    hook = BaseHook('M_1')
    extra = hook.get_connection().extra_dejson
    key_id = extra.get('aws_access_key_id')
    secret_key = extra.get('aws_secret_access_key')
    default_region = extra.get('region_name')
    return key_id,secret_key,default_region
0 голосов
/ 12 июня 2018

Одна вещь, которую вы можете сделать, это импортировать утилиту provide_session, чтобы затем извлечь соединение на основе conn_id.Затем вы можете передать это оператору Python.

Таким образом, это выглядело бы примерно так:

from airflow.utils.db import provide_session

@provide_session
def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn

def my_python_function():

   conn = get_conn('connection_id')

   key_id = conn.extra_dejson.get('AWS_ACCESS_KEY_ID')
   secret_key = conn.extra_dejson.get('AWS_SECRET_ACCESS_KEY')
   default_region = conn.extra_dejson.get('DEFAULT_REGION')

task1 = PythonOperator(task_id='my_task', python_callable=my_python_function, dag=dag)

task1

EDIT: Удалены кавычки из вызываемого Python

...