Apache Воздушный поток - повторное выполнение рецепта при каждом задании изменения даты и даты - PullRequest
0 голосов
/ 25 февраля 2020

Я новичок в использовании воздушного потока. Я заметил, что если вы определяете глобальную переменную (метку времени) в коде, это значение будет меняться для каждой задачи. Например, в самом базовом примере ниже c я определяю переменную сейчас , но каждый раз, когда я печатаю ее в задаче, это значение изменяется.

from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import time

now = int(time.time() * 1000)
RANGE = range(1, 10)


def init_step():
    print("Run on RANGE {}".format(RANGE))
    print("Date of the Scans {}".format(now))
    return RANGE


def trigger_step(index):
    time.sleep(10)
    print("index {} - date {}".format(index, now))
    return index


default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 2,
    'retry_delay': timedelta(minutes=15)
}

with DAG('test',
         default_args=default_args,
         schedule_interval='0 16 */7 * *',
         ) as dag:
    init = PythonOperator(task_id='init',
                          python_callable=init_step,
                          dag=dag)

    for index in init_step():
        run = PythonOperator(task_id='trigger-port-' + str(index),
                             op_kwargs={'index': index},
                             python_callable=trigger_step, dag=dag)

        dag >> init >> run

Это нормально? поведение? Есть ли способ изменить это?

...