Как использовать BigQueryOperator с execute_date? - PullRequest
0 голосов
/ 18 декабря 2018

Это мой код:

EXEC_TIMESTAMP  = "{{  execution_date.strftime('%Y-%m-%d %H:%M')  }}"
query = """
        select ... where date_purchased between TIMESTAMP_TRUNC(cast ( {{ params.run_timestamp }} as TIMESTAMP), HOUR, 'UTC') ...
        """
generate_op = BigQueryOperator(
                    bql=query,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    query_params={'run_timestamp': EXEC_TIMESTAMP},
                    dag=dag)

Это должно работать, но это не так.Вкладка рендеринга показывает мне:

between TIMESTAMP_TRUNC(cast (  as TIMESTAMP), HOUR, 'UTC')

Дата отсутствует.Он превращается в ничто.

Как я могу это исправить?Для этого оператора нет provide_context=True.Я не знаю, что делать.

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018

Проблема в том, что вы используете query_params, которое не является шаблонным полем, как упомянуто @dlamblin.

Используйте следующий код, который напрямую использует execution_date дату внутри bql:

import airflow
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
import os


CONNECTION_ID = Variable.get("Your_Connection")

args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 12, 27, 11, 15),
    'retries': 4,
    'retry_delay': timedelta(minutes=10)
}


dag = DAG(
    dag_id='My_Test_DAG',
    default_args=args,
    schedule_interval='15 * * * *',
    max_active_runs=1,
    catchup=False,
)

query = """select customers_email_address as email, 
   from mytable
   where 
    and date_purchased = TIMESTAMP_SUB(TIMESTAMP_TRUNC(cast ({{  execution_date.strftime('%Y-%m-%d %H:%M')  }} as TIMESTAMP), HOUR, 'UTC'), INTERVAL 1 HOUR) """

create_orders_temp_table_op = BigQueryOperator(
                    bql = query,
                    destination_dataset_table='some table',
                    task_id='create_orders_temp_table',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    dag=dag)

start_task_op = DummyOperator(task_id='start_task', dag=dag)


start_task_op  >> create_orders_temp_table_op
0 голосов
/ 18 декабря 2018

Луис, query_params - это не params, на который вы можете ссылаться в контексте шаблонов.Они не добавлены к этому.А поскольку params пусто, ваш {{ params.run_timestamp }} равен либо "", либо None.Если вы измените это значение на params={'run_timestamp':…}, оно все равно будет иметь проблемы, поскольку значения params не являются шаблонными.Поэтому, когда вы используете шаблонное поле bql для включения {{ params.run_timestamp }}, вы получите именно то, что в params: {'run_timestamp': …str… } заполнено БЕЗ любого рекурсивного расширения этого значения.Вы должны получить {{ execution_date.strftime('%Y-%m-%d %H:%M') }}.

Позвольте мне переписать это для вас (но, возможно, у меня неправильно разыграли парены, не уверен):

generate_op = BigQueryOperator(
                    sql="""
select ...
where date_purchased between
  TIMESTAMP_TRUNC(cast('{{execution_date.strftime('%Y-%m-%d %H:%M')}}') as TIMESTAMP), HOUR, 'UTC')
...
                    """,
                    destination_dataset_table=table_name,
                    task_id='generate',
                    bigquery_conn_id=CONNECTION_ID,
                    use_legacy_sql=False,
                    write_disposition='WRITE_TRUNCATE',
                    create_disposition='CREATE_IF_NEEDED',
                    dag=dag,
)

Вы можете см. Поля bql и sql с шаблонами .Однако поле bql устарело и удалено в более позднем коде.

...