получить идентификатор задания от BigQueryOperator с помощью xcom - PullRequest
1 голос
/ 20 июня 2019

Я хочу получить идентификатор работы Bigquery в BigQueryOperator.

В файле bigquery_operator.py я увидел следующую строку:

context['task_instance'].xcom_push(key='job_id', value=job_id)

Я не знаю, является ли это идентификатором задания airflow или идентификатора задания BigQuery, если это идентификатор задания BigQuery, как я могу получить его с помощью xcom из последующей задачи?.

Я попытался сделать следующее в последующем Pythonoperator:

def write_statistics(**kwargs):
  job_id = kwargs['templates_dict']['job_id']
  print('tamir')
  print(kwargs['ti'].xcom_pull(task_ids='create_tmp_big_query_table',key='job_id'))
  print(kwargs['ti'])
  print(job_id)

t3 = BigQueryOperator(
        task_id='create_tmp_big_query_table',
        bigquery_conn_id='bigquery_default',
        destination_dataset_table= DATASET_TABLE_NAME,
        use_legacy_sql=False,
        write_disposition='WRITE_TRUNCATE',
        sql = """
        #standardSQL...

1 Ответ

0 голосов
/ 21 июня 2019

Пользовательский интерфейс отлично подходит для проверки того, был ли записан XCom или нет, что я бы порекомендовал вам сделать еще до того, как вы попытаетесь сослаться на него в отдельной задаче, чтобы вам не пришлось беспокоиться о том, что вы выбираете это правильно или нет. Выберите задачу create_tmp_big_query_table -> Сведения об экземпляре задачи -> XCom. Это будет выглядеть примерно так:

xcom page

В вашем случае код выглядит правильно для меня, но я предполагаю, что ваша версия Airflow не имеет изменений, которые добавили сохранение идентификатора задания в XCom. Эта функция была добавлена ​​в https://github.com/apache/airflow/pull/5195,, которая в настоящее время только на master и в настоящее время не является частью самой последней стабильной версии (1.10.3). Убедитесь сами в версии 1.10.3 BigQueryOperator .

Вы можете подождать, пока он появится в выпуске (... иногда занимает некоторое время), запустить версию master с этим изменением или временно скопировать более новую версию оператора в качестве пользовательского оператора. , В последнем случае я бы посоветовал назвать что-то вроде BigQueryOperatorWithXcom с примечанием, чтобы заменить его встроенным оператором после его выпуска.

...