тянуть xcom в BigQueryOperator - PullRequest
       24

тянуть xcom в BigQueryOperator

1 голос
/ 01 апреля 2019

Я пытаюсь запустить BigQueryOperator с некоторым динамическим параметром, основанным на предыдущей задаче с использованием xcom (мне удалось протолкнуть его с помощью BashOperator с xcom_push = True)

Я думал, что с помощью следующего можно добиться цели

def get_next_run_date(**context):
    last_date = context['task_instance'].xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip()
    last_date = datetime.strptime(last_date, "%Y%m%d").date()
    return last_date + timedelta(days=1)

t3 = BigQueryOperator(
    task_id='autoplay_calc',
    bql='autoplay_calc.sql',
    params={
            "env" : deployment
            ,"region" : region
            ,"partition_start_date" : get_next_run_date()
            },
    bigquery_conn_id='gcp_conn',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    #provide_context=True,
    destination_dataset_table=reporting_project + '.pa_reporting_public_batch.autoplay_calc',
    dag=dag
    )` 

, но с использованием вышеизложенного выдает ошибку Broken Dag с ошибкой task_instance.

Ответы [ 2 ]

0 голосов
/ 02 апреля 2019

Вы используете его неправильно.

Нельзя использовать xcom в params. Вам необходимо использовать его в параметре bql/sql. Ваш файл sql, autoplay_calc.sql может содержать что-то вроде

select * from XYZ where date == "{{xcom_pull(task_ids=['get_autoplay_last_run_date'])[0].rstrip() }}"
0 голосов
/ 02 апреля 2019

Вы пытались использовать контекст ['ti']. Xcom_pull ()?

...