Как предотвратить «Ошибка выполнения: [Errno 32] Разбитая труба» в Airflow - PullRequest
0 голосов
/ 03 марта 2019

Я только начал использовать Airflow для координации нашего конвейера ETL.

Я столкнулся с ошибкой конвейера при запуске dag.

Я видел общее обсуждение stackoverflow здесь .

Мой случай больше касается стороны воздушного потока.Согласно обсуждению в этом посте, возможная основная причина:

Ошибка прерванного канала обычно возникает, если ваш запрос заблокирован или занимает слишком много времени, а после тайм-аута на стороне запроса он закрываетсоединение, а затем, когда отвечающая сторона (сервер) попытается записать в сокет, он выдаст ошибку обрыва канала.

Это может быть реальной причиной в моем случае, у меня есть pythonoperatorэто запустит другую работу за пределами Airflow, и эта работа может быть очень продолжительной (например, 10+ часов), мне интересно, какой механизм в Airflow используется для предотвращения этой ошибки.

Можеткто-нибудь поможет?

UPDATE1 20190303-1:

Благодаря @ y2k-shubham для SSHOperator, я могу использовать его для успешной установки соединения SSH и могу выполнять несколько простых команд наудаленный сайт (действительно, по умолчанию ssh-соединение должно быть установлено на localhost, потому что задание находится на localhost), и я могу видеть правильный результат hostname, pwd.

Однако, когда я попыталсячтобы запустить реальное задание, я получил ту же ошибку, опять-таки, ошибка из jpipeline ob вместо dag / задачи Airflow.

UPDATE2: 20190303-2

У меня был успешный запуск(проверка воздушного потока) без ошибок, а затем последовал другой неудачный прогон (планировщик) с той же ошибкой из конвейера.

1 Ответ

0 голосов
/ 03 марта 2019

Хотя я бы посоветовал вам продолжать искать более изящный способ достижения того, чего вы хотите, я приведу пример использования в соответствии с просьбой


Сначала вы должны создать SSHHook.Это можно сделать двумя способами

  • Традиционным способом, когда вы вводите все необходимые настройки, такие как хост, пользователь, пароль (при необходимости) и т. Д. Из код клиента , где вы создаете экземпляр хука.Я приведу пример из test_ssh_hook.py, но вы должны тщательно пройти через SSHHook, а также его тесты, чтобы понять все возможные варианты использования

    ssh_hook = SSHHook(remote_host="remote_host",
                       port="port",
                       username="username",
                       timeout=10,
                       key_file="fake.file")
    
  • Airflow способ , где вы помещаете все детали соединения в Connection объект, которым можно управлять из пользовательского интерфейса и только передать его conn_id для создания экземпляраваш крюк

    ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")
    

    Конечно, если вы полагаетесь на SSHOperator, то вы можете напрямую передать ssh_conn_id оператору.

    ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")
    

Теперь, если вы планируете иметь выделенную задачу для запуска команды над SSH, вы можете использовать SSHOperator.Опять же я привожу пример из test_ssh_operator.py, но просмотрите источники для лучшей картины.

 task = SSHOperator(task_id="test",
                    command="echo -n airflow",
                    dag=self.dag,
                    timeout=10,
                    ssh_conn_id="ssh_default")

Но тогда вы можетехотите выполнить команду через SSH как часть вашей более крупной задачи .В этом случае вам не нужен SSHOperator, вы все равно можете использовать только SSHHook.get_conn() метод SSHHook предоставляет вам экземпляр paramiko SSHClient.При этом вы можете запустить команду, используя exec_command() вызов

my_command = "echo airflow"
stdin, stdout, stderr = ssh_client.exec_command(
  command=my_command,
  get_pty=my_command.startswith("sudo"),
  timeout=10)

Если вы посмотрите на SSHOperator 'execute() метод, это довольно сложный (но надежный) кусок кода, пытающийся достичь очень простой вещи.Для собственного использования я создал несколько фрагментов , на которые вы, возможно, захотите взглянуть

  • . Для использования SSHHook независимо от SSHOperator посмотрите ssh_utils.py
  • Для оператора, который запускает несколько команд по SSH (вы можете добиться того же, используя bash s && operator ), см. MultiCmdSSHOperator
...