Воздушный поток: как получить значение xcom от BigQueryOperator? - PullRequest
0 голосов
/ 14 октября 2018

Это мой оператор:

bigquery_check_op = BigQueryOperator(
    task_id='bigquery_check',
    bql=SQL_QUERY,
    use_legacy_sql = False,
    bigquery_conn_id=CONNECTION_ID,
    trigger_rule='all_success',
    xcom_push=True,
    dag=dag
)

Когда я проверяю страницу визуализации в пользовательском интерфейсе.Там ничего не появляется.Когда я запускаю SQL в консоли, он возвращает значение 1400, которое является правильным.Почему оператор не нажимает XCOM?

Я не могу использовать BigQueryValueCheckOperator.Этот оператор предназначен для СБОЙ против проверки значения.Я не хочу ничего потерпеть неудачу.Я просто хочу разветвить код на основе возвращаемого значения из запроса.

Ответы [ 2 ]

0 голосов
/ 27 декабря 2018

Вот как вы могли бы сделать это с помощью BigQueryHook и BranchPythonOperator:

from airflow.operators.python_operator import BranchPythonOperator
from airflow.contrib.hooks import BigQueryHook

def big_query_check(**context):
    sql = context['templates_dict']['sql']
    bq = BigQueryHook(bigquery_conn_id='default_gcp_connection_id',
                        use_legacy_sql=False)
    conn = bq.get_conn()
    cursor = conn.cursor()
    results = cursor.execute(sql)

    # Do something with results, return task_id to branch to
    if results = 0:
        return "task_a"
    else:
        return "task_b"


sql = "SELECT COUNT(*) FROM sales"


branching = BranchPythonOperator(
    task_id='branching',
    python_callable=big_query_check,
    provide_context= True,
    templates_dict = {"sql": sql}
    dag=dag,
)

Сначала мы создаем вызываемый Python, который мы можем использовать для выполнения запроса, и выбираем, какой task_id также разветвляется,Во-вторых, мы создаем BranchPythonOperator.

0 голосов
/ 26 декабря 2018

Самый простой ответ заключается в том, что xcom_push не является ни одним из параметров в BigQueryOperator, ни BaseOperator, ни LoggingMixin.

BigQueryGetDataOperator возвращает (и, следовательно, выдвигает) некоторые данные, но работает по имени таблицы и столбца.Вы можете связать это поведение, выполнив запрос, который вы выводите в таблицу с уникальным именем (возможно, используйте {{ds_nodash}} в имени), а затем использовать таблицу в качестве источника для этого оператора, и затем вы можетеветвь с BranchPythonOperator.

Вместо этого вы можете попробовать использовать BigQueryHook get_conn().cursor() для запуска запроса и работы с некоторыми данными внутри BranchPythonOperator.

В другом месте мы болтали и придумали что-то вроде этого для вставки вызываемого BranchPythonOperator:

cursor = BigQueryHook(bigquery_conn_id='connection_name').get_conn().cursor()
# one of these two:
cursor.execute(SQL_QUERY)  # if non-legacy
cursor.job_id = cursor.run_query(bql=SQL_QUERY, use_legacy_sql=False)  # if legacy
result=cursor.fetchone()
return "task_one" if result[0] is 1400 else "task_two"  # depends on results format
...