Преобразование файла Python в Airflow DAG - PullRequest
0 голосов
/ 11 июня 2018

У меня есть этот файл Python:

class Get:

    def __init__(self, i):
        self.i = get_date(i)
        self.df = self.get_file()

    def get_file(self):
        try:
            ...
            return df
        except Exception as e:
            return ...

    def get_date(self,i):
        dt = datetime.now() - timedelta(days=i)
        return dt.strftime("%Y-%m-%d")

    def put(self,df):
        ....


class Fix:
    def __init__(self,df):
        ....

if __name__ == '__main__':
    for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)

По сути, этот код генерирует 4 последние даты и выполняет функции в эти даты (обновление статистики и т. Д.)

Всначала Я хотел преобразовать каждую функцию в PythonOperator, а затем запланировать ее, но я не думаю, что это сработает.Я не знаю, как преобразовать классы и параметры, которые передаются между ними.

Вот что делает код, если я запускаю его в 2018-Jun-12 и ниже того, что должно быть с Airflow:enter image description here

Есть ли шаблон, который я могу использовать или какие-либо предложения, как это сделать?

1 Ответ

0 голосов
/ 27 сентября 2018

вы можете выполнить ваш скрипт с помощью BashOperator без каких-либо изменений в вашем скрипте:

dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily', 
default_args=default_args)

t1 = BashOperator(
    task_id = '{NAME_OF_TASK}',
    dag = dag,
    bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')

или использовать PythonOperator:

  1. обновить код для создания main функция в вашем скрипте:

    def main():
        for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
    
  2. определить и выполнить dag:

    dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily', 
    default_args=default_args)
    
    t1 = PythonOperator(
        task_id = '{NAME_OF_TASK}',
        dag = dag,
        python_callable = main)
    
...