Как определить операции оператора STFP на воздушном потоке? - PullRequest
0 голосов
/ 08 января 2020
class SFTPOperation(object):
    PUT = 'put'
    GET = 'get'  

operation=SFTPOperation.GET,
NameError: name 'SFTPOperation' is not defined

Я определил здесь операторы, но ничего не могу найти в inte rnet, относящемся к операциям

class sftpplugin(AirflowPlugin):
    name = "sftp_plugin"
    operators = [SFTPOperator]

Любая помощь приветствуется!

Спасибо,

1 Ответ

1 голос
/ 08 января 2020

Заметив, что оператор SFTP использует ssh_hook для открытия транспортного канала sftp, вам необходимо предоставить ssh_hook или ssh_conn_id для передачи файла. Во-первых, давайте рассмотрим пример, предоставляющий параметр ssh_conn_id.

from airflow.providers.sftp.operators import sftp_operator
from airflow import DAG
import datetime

dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)

put_operation = SFTPOperator(
            task_id="operation",
            ssh_conn_id="ssh_default",
            local_filepath="route_to_local_file",
            remote_filepath="remote_route_to_copy",
            operation="put",
            dag=dag
            )
get_operation = SFTPOperator(....,
            operation = "get",
            dag = dag
            )

put_operation >> get_operation

Обратите внимание, что dag должен быть запланирован так, как требуется вашей задачей, здесь в этом примере рассматривается ежедневное расписание, начиная с полудня. Теперь, если вы предоставляете SSHhook, необходимо внести следующие изменения в приведенный выше код

from airflow.contrib.hooks.ssh_hook import SSHHook
...

put_operation = SFTPOperator(
            task_id="operation",
            ssh_hook=SSHHook("Name_of_variable_defined"),
            ...
            dag=dag
            )
....

, где "Name_of_variable_defined" создается в Admin -> Соединения на интерфейсе Airflow.

...