Почему оператор TimeSensor дает мне эту ошибку - PullRequest
0 голосов
/ 08 октября 2019

сбой воздушного потока TimeDeltaSensor с неподдерживаемым типом операнда

Я использую Воздушный поток в "Google Cloud Composer" , и мне нужнополучить данные из "FaceBook Ads" для разных учетных записей, поэтому мне нужно поместить несколько секунд задержки между двумя вызовами. Я пытался с TimeDeltaSensor , но вставка вызова этого оператора, в цепочке dag, отправляет код по ошибке.

Это мой dag:

dag_start_date = get_yesterday(at_midnight=True)
dag_args = get_dag_args(FLOW, dag_name, dag_start_date=dag_start_date)
with DAG(dag_id=dag_name, default_args=dag_args, start_date=dag_start_date, schedule_interval='@once') as dag:
    worker = FacebookAdsToBigQuery(FLOW, dag)
    end_date = dag_start_date
    start_date = end_date - timedelta(days=3)
    worker.run_dag(start_date, end_date)

А это кусочек кода о TimeDeltaSensor:

....
        for level in FacebookAdsToBigQuery.LEVELS:
            table_schema_fname = self.level_table_schema_file[level]
            schema_fields = bb_utils.load_json_file(table_schema_fname)
            for j, account_id in enumerate(self.account_ids):
                tsk_prefix = f"{self.dag.dag_id}_{account_id}_{level}"
                delta = j * 10
                if delta > 0:
                    wait_x_nextact_tsk = TimeDeltaSensor(task_id=f'{tsk_prefix}_wait_{delta}',
                                                         delta=timedelta(seconds=delta), dag=self.dag)
                else:
                    wait_x_nextact_tsk = DummyOperator(task_id=f'{tsk_prefix}_nowait', dag=self.dag)

....

Это ошибка:

[2019-10-08 11: 18: 25,547] {base_task_runner.py:101} ИНФОРМАЦИЯ - задание 199692: подзадача facebook_ads_233070883986464_ad_wait_40 [2019-10-08 11: 18: 25,546] {cli.py:520} ИНФОРМАЦИЯ - работает на хосте airflow-worker-5fd4678587-l6nlf

1022 * 1022[2019-10-08 11: 18: 25,642] {models.py:1796} ОШИБКА - неподдерживаемые типы операндов для + =: 'NoneType' и 'datetime.timedelta'

Theпричина может заключаться в том, что оператор не может понять параметр dag schedule_interval = '@ Once' ?

Спасибо

...