Я новичок в использовании воздушного потока. Я заметил, что если вы определяете глобальную переменную (метку времени) в коде, это значение будет меняться для каждой задачи. Например, в самом базовом примере ниже c я определяю переменную сейчас , но каждый раз, когда я печатаю ее в задаче, это значение изменяется.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import time
now = int(time.time() * 1000)
RANGE = range(1, 10)
def init_step():
print("Run on RANGE {}".format(RANGE))
print("Date of the Scans {}".format(now))
return RANGE
def trigger_step(index):
time.sleep(10)
print("index {} - date {}".format(index, now))
return index
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 2,
'retry_delay': timedelta(minutes=15)
}
with DAG('test',
default_args=default_args,
schedule_interval='0 16 */7 * *',
) as dag:
init = PythonOperator(task_id='init',
python_callable=init_step,
dag=dag)
for index in init_step():
run = PythonOperator(task_id='trigger-port-' + str(index),
op_kwargs={'index': index},
python_callable=trigger_step, dag=dag)
dag >> init >> run
Это нормально? поведение? Есть ли способ изменить это?