Динамическое соединение EMR воздушного потока - PullRequest
0 голосов
/ 02 мая 2019

У меня есть Airflow DAG, которая создает кластер EMR, а затем запускает задачи SSHOperator на этом кластере. Прямо сейчас я жестко кодирую основной общедоступный DNS для кластера EMR в SSH-соединение Airflow. Есть ли способ для моей группы обеспечения доступности баз данных динамически заполнять этот DNS при создании кластера EMR, чтобы мне не приходилось обновлять соединение вручную?

Ответы [ 2 ]

0 голосов
/ 02 мая 2019

Вы можете использовать переменные воздушного потока xcom для передачи значения из одной задачи в другую. В вашем сценарии использования вы можете передать значение EMR DNS из задачи создания EMR в задачу SSH через переменную XCOM.

Концепции воздушного потока Xcom

Отправка данных в xcom:

context['ti'].xcom_push(key="xcom_key", value="DNS_NAME")

извлечение данных из xcom:

context['ti'].xcom_pull(key="xcom_key", task_ids="EMR_Task")
0 голосов
/ 02 мая 2019

После того, как я еще немного покопался в CLI Airflow, я обнаружил, что можно создавать / удалять новые соединения.Я добавил оператор bash после создания кластера EMR, чтобы добавить соединение с Airflow.

airflow connections --delete --conn_id aws_emr

airflow connections --add --conn_id aws_emr --conn_type SSH --conn_host publicDNS --conn_login username --conn_extra {"key_file":"file.pem"}

...