Воздушный поток - определение ключа, значения для функции xcom_push - PullRequest
0 голосов
/ 30 мая 2018

Я пытаюсь передать функцию Python в Airflow.Я не уверен, что ключ и значения должны быть для функции xcom_push.Может ли кто-нибудь помочь в этом.Спасибо

def db_log(**context):
  db_con = psycopg2.connect(" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
  task_instance = context['task_instance']
  task_instance.xcom_push(key=db_con, value = db_log)
  return (db_con)

Может ли кто-нибудь помочь в получении правильного ключа и значения для функции xcom_push.Спасибо ..

Ответы [ 4 ]

0 голосов
/ 29 июля 2019

Вместо использования xcom для подключения к вашей БД я бы порекомендовал вам использовать Соединения: https://airflow.apache.org/howto/connection/index.html

Начните с установки соединения для подключения к вашей БД из командной строки с помощью:

airflow connections -a --conn_id postgres_custom --conn_host <your-host> --conn_type postgres --conn_port 1234 --conn_login <username> --conn_password <password> --conn_extra {"sslmode": "require"}

Или прямо из интерфейса.Вот некоторая документация о том, как установить соединение postgres в воздушном потоке (работает также с другими типами БД): https://airflow.apache.org/howto/connection/postgres.html

Затем вы можете запросить вашу базу данных с некоторым DAG:

DAG_ARGS = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


DAG_ID = "Dummy_DAG"


with DAG(dag_id=DAG_ID,
         default_args=DAG_ARGS,
         schedule_interval=None) as dag:

    query_1 = PostgresOperator(
        task_id='POSTGRES_QUERY',
        postgres_conn_id='postgres_custom',
        sql= """SELECT COUNT(*) FROM TABLE A""",
        database="my-db",
        dag=dag,
    )

    query_2 = PostgresOperator(
        task_id='POSTGRES_QUERY_2',
        postgres_conn_id='postgres_custom',
        sql="""SELECT COUNT(*) FROM TABLE B""",
        database="my-db",
        dag=dag,
    )

    query_1 >> query_2
0 голосов
/ 18 августа 2018

См. Приведенный ниже пример:

Надеюсь, что это поможет.

args = {
    'owner': 'airflow',
    'start_date': start_date
}

dag = DAG(dag_id = 'test_dag', schedule_interval=None, default_args=args)
y = 0

def LoadYaml(**kwargs):
        y = 'df-12345567789'
        kwargs['ti'].xcom_push(key='name',value=y)
        return True

def CreatePipeLine(**kwargs):
        print("I m client")

def ActivatePipeline(client,pipelineId):
        print("activated", client, pipelineId)

start_task = DummyOperator(task_id='Start_Task', dag=dag)

LoadYaml_task = ShortCircuitOperator(task_id='LoadYaml_task',provide_context=True,python_callable=LoadYaml,dag=dag)

start_task.set_downstream(LoadYaml_task)

CreatePipeLine_task = ShortCircuitOperator(task_id='CreatePipeLine_task',provide_context=True,python_callable=CreatePipeLine,op_kwargs = {'client' : 'HeyImclient'},dag=dag)

LoadYaml_task.set_downstream(CreatePipeLine_task)

ActivatePipeline_task= ShortCircuitOperator(task_id='ActivatePipeline_task',provide_context=True,python_callable=ActivatePipeline,op_kwargs = {'client' : 'You','pipelineId' : '1234'},dag=dag)

CreatePipeLine_task.set_downstream(ActivatePipeline_task)
0 голосов
/ 29 июля 2019

Это немного устарело, но насколько я понимаю, если вы запускаете db_log в качестве задачи, то возврат db_con автоматически отправит его в xcom.

Вы можете получить к нему доступ с помощью {{ti.xcom_pull(task_ids='TASK_NAME_HERE')}}

0 голосов
/ 30 мая 2018

В примерах можно найти правильный способ вызова, например: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

Так что здесь должно быть

task_instance.xcom_push(key=<string identifier>, value=<actual value / object>)

В вашем случае

task_instance.xcom_push(key="db_con", value=db_con)
...