Я создал собственный оператор в Airflow, который вызывает API для извлечения данных, а затем записывает их в BigQuery. Однако проблема заключается в том, что я должен передать макрос execute_date как параметр API, чтобы иметь возможность вызывать данные за эту дату. К сожалению, когда я попытался сделать это, мой оператор не смог разобрать шаблон jinja, который я передал. Когда я проверил логирование, которое я сделал для этого, он показывает только шаблон, как показано на рисунке ниже. Я надеюсь, что вы, ребята, можете помочь.
Журналы воздушного потока
Это мой код для пользовательского оператора и метки. Спасибо!
...
class MyOperator(BaseOperator):
def __init__(self,date):
super(MyOperator,self).__init__(*arg,**kwargs)
self.date = date
def __pull_from_api(self):
api_link = "somelink.com/api/date={}".format(self.date)
data = request.get(api_link).json()
return data
def execute(self,context):
data = self.__pull_from_api()
...
dag = DAG('My Pipeline', default_args=default_args)
t1 = MyOperator(date='{{ execution_date}}', task_id='my_pipeline_1', dag=dag)
t1