Воздушный поток - как использовать датчик из другой задачи датчика - PullRequest
0 голосов
/ 24 мая 2019

Я пытаюсь проверить, существует ли файл на удаленном сервере, если это так, проверьте, равно ли число строк 0. Если число строк больше 0, конвейер должен продолжаться, если нет, я хочу датчик продолжать проверку (в имени файла указана дата, поэтому на следующий день, возможно, новый файл будет не пустым)

Может ли кто-нибудь помочь пролить свет на то, как это реализовать? Я думаю, я могу использовать датчик SFTP из функции Python, который проверяет строки? Если да, то как я могу использовать датчик из другого? Большое спасибо

1 Ответ

1 голос
/ 24 мая 2019

Вы можете просто сделать обычный датчик, который выполняет обе эти задачи, вот схема того, как это реализовать, вы должны поместить этот файл в папку плагинов внутри воздушного потока, а затем импортировать и использовать его как частьDAG.

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators  import apply_defaults
from airflow.plugins_manager   import AirflowPlugin

import requests
import logging
import json

DEFAULT_CONNECTION_DETAILS = { "host": "127.0.0.1", "password": "wololo" }

log = logging.getLogger( __name__ )

class Remote_File_Row_Sensor( BaseSensorOperator ):

    @apply_defaults
    def __init__( self, file_name, connection_details= DEFAULT_CONNECTION_DETAILS, *args, **kwargs ):
        super( Remote_File_Row_Sensor, self ).__init__( *args, **kwargs )
        self.connection_details = connection_details
        self.file_name          = file_name

    def poke( self, context ):
        connection_details   = self.connection_details
        file_name            = self.file_name

        ROW_COUNT = 0

        # Your code here to connect using SFTP and read the file for the row count

        if ROW_COUNT == 0:
            return False
        else:
            return True

class Remote_File_Row_Plugin( AirflowPlugin ):
    name      = "remote_file_row_sensor"
    operators = [ Remote_File_Row_Sensor ]
...