Как сохранить результат запроса SQL с помощью Airflow и использовать результат в условии if else? - PullRequest
0 голосов
/ 09 марта 2020

Я пытался использовать XCOM, но в качестве значения

class DWPostgresReturn(DWPostgresOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = DWPostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
        return hook.get_records(
            self.sql,
            parameters=self.parameters)

with DWDAG(config) as dag:

    t1 = DWPostgresReturn(
       task_id='t1',
       postgres_conn_id='db_conn',
       sql="select output from table",
       config=config,
       start_date = dt.datetime(2020,3,9)

    )

    def get_records(**kwargs):
       ti = kwargs['ti']
       xcom = ti.xcom_pull(task_ids='t1')   
       string_to_print = 'Value in xcom is: {}'.format(xcom)
       print(xcom)



    t2 = PythonOperator(
       task_id='records',
       provide_context=True,
       python_callable=get_records,
       config = config,
       start_date = dt.datetime(2020,3,9)


    )

t1 >> t2
он не отображается.
...