Я определил соединение SSH через интерфейс администратора Airflow. Однако я определяю только учетную запись службы, хост и порт в пользовательском интерфейсе. Я получаю пароль в первом экземпляре задачи, и мне нужно обновить соединение SSH с помощью пароля во втором экземпляре задачи и использовать его в третьем экземпляре задачи.
- t1: вызов функции R для получения пароля для учетной записи svc (сохранено
в xcom_push)
- t2: обновите соединение SSH с помощью этого пароля (я использую
SSHHook) ssh02.password = пароль (получен через xcom_pull)
- t3: вызов сервера с использованием ранее обновленного соединения (ssh02)
В настоящее время t1 и t2 работают должным образом, однако t3 завершается ошибкой, так как пароль не обновляется, и он ищет .ssh
аутентификацию на основе файла ключа. Может кто-нибудь подсказать, как это можно реализовать?
Вот мой фрагмент кода:
from airflow import models
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.models import Variable
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils import db
from airflow.utils.db import provide_session
from airflow import DAG
import logging
import os
svcpassword = 'XXXX'
logging.getLogger().setLevel(logging.DEBUG)
ssh01 = SSHHook(ssh_conn_id='ssh_conn1')
ssh02 = SSHHook(ssh_conn_id='ssh_conn2')
default_args = {
'owner': 'user',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['abcd@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay':timedelta(minutes=1)
}
dag = DAG('dag_POC', default_args=default_args,
schedule_interval="@once")
path1 = '/home/user/R_samplescript'
t1 = SSHOperator(
task_id='SSHTask',
command='Rscript '+path1+'.R',
ssh_hook=ssh01,
params={},retries =1 ,
do_xcom_push = True,
dag = dag
)
def create_new_connection(**kwargs):
ti = kwargs['ti']
pwd = ti.xcom_pull(task_ids='SSHTask')
password = str(pwd).replace("\\n","\n")
password = password[password.find(' ')+1 : ]
password = password.strip()
svcpassword = password
db.merge_conn( models.Connection(
conn_id='ssh_conn2', conn_type='SSH',
host='server_name', port='XXXX',login =
'account_name',password = svcpassword))
t2 = PythonOperator(
task_id='Create_Connection',
python_callable=create_new_connection,
provide_context=True,
dag=dag
)
t3 = SSHOperator(
task_id='RemoteCallTest',
command="R command",
ssh_hook = SSHHook().get_conn('ssh_conn2'),
do_xcom_push = False,
retries = 1,
dag=dag
)
t1 >> t2 >> t3