Как динамически обновлять параметры существующего соединения Airflow (версия 1.9) в коде? - PullRequest
0 голосов
/ 06 июля 2018

Я определил соединение SSH через интерфейс администратора Airflow. Однако я определяю только учетную запись службы, хост и порт в пользовательском интерфейсе. Я получаю пароль в первом экземпляре задачи, и мне нужно обновить соединение SSH с помощью пароля во втором экземпляре задачи и использовать его в третьем экземпляре задачи.

  • t1: вызов функции R для получения пароля для учетной записи svc (сохранено в xcom_push)
  • t2: обновите соединение SSH с помощью этого пароля (я использую SSHHook) ssh02.password = пароль (получен через xcom_pull)
  • t3: вызов сервера с использованием ранее обновленного соединения (ssh02)

В настоящее время t1 и t2 работают должным образом, однако t3 завершается ошибкой, так как пароль не обновляется, и он ищет .ssh аутентификацию на основе файла ключа. Может кто-нибудь подсказать, как это можно реализовать?

Вот мой фрагмент кода:

    from airflow import models
    from airflow.contrib.operators.ssh_operator import SSHOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    from airflow.contrib.hooks.ssh_hook import SSHHook
    from airflow.models import Variable
    from airflow.models import Connection
    from airflow.settings import Session
    from airflow.utils import db
    from airflow.utils.db import provide_session
    from airflow import DAG
    import logging
    import os

    svcpassword = 'XXXX'

    logging.getLogger().setLevel(logging.DEBUG)


    ssh01 = SSHHook(ssh_conn_id='ssh_conn1')
    ssh02 = SSHHook(ssh_conn_id='ssh_conn2')


    default_args = {
     'owner': 'user',
     'depends_on_past': False,
     'start_date': datetime.now(),
     'email': ['abcd@gmail.com'],
     'email_on_failure': True,
     'email_on_retry': True,
     'retries': 1,
     'retry_delay':timedelta(minutes=1)
  }


   dag = DAG('dag_POC', default_args=default_args, 
   schedule_interval="@once")

   path1 = '/home/user/R_samplescript'


   t1 = SSHOperator(
        task_id='SSHTask',
        command='Rscript '+path1+'.R',
        ssh_hook=ssh01,
        params={},retries =1 ,
        do_xcom_push = True,
        dag = dag
      )

  def create_new_connection(**kwargs):
       ti = kwargs['ti']
       pwd = ti.xcom_pull(task_ids='SSHTask')
       password = str(pwd).replace("\\n","\n")
       password = password[password.find(' ')+1 : ]
       password = password.strip()
       svcpassword = password
       db.merge_conn( models.Connection(
                   conn_id='ssh_conn2', conn_type='SSH',
                    host='server_name', port='XXXX',login = 
                   'account_name',password = svcpassword))


        t2 = PythonOperator(
                task_id='Create_Connection',
                python_callable=create_new_connection,
                provide_context=True,
                dag=dag
                )


          t3 = SSHOperator(
                  task_id='RemoteCallTest',
                  command="R command",
                  ssh_hook = SSHHook().get_conn('ssh_conn2'),
                  do_xcom_push = False,
                  retries = 1,
                  dag=dag
             )



          t1 >> t2 >> t3

1 Ответ

0 голосов
/ 06 июля 2018

Вам необходимо использовать оболочку сеанса, чтобы сохранить изменения в БД

@provide_session()
def set_password(session=None):
    conn = MyHook().get_conn(conn_id)
    conn.set_password(my_password)

    session.add(conn)
    session.commit()
...