Воздушный поток - неожиданный аргумент ключевого слова dag - PullRequest
0 голосов
/ 21 июня 2019

Я знаю, что уже есть функция воздушного потока, которая передает файл из облачного хранилища в Big Query. Как и я, я установил соединение внутри сценария с GCP так же, как без воздушного потока, я вызвал PythonOperator для вызова функция, которую я настроил в скрипте для чтения облачного хранилища и вставки данных из файла в Big Query, однако я получаю сообщение об ошибке: «получил неожиданный аргумент ключевого слова« dag »»

Кажется, это довольно просто решить, но я действительно не знаю, что это значит, поскольку я поместил атрибуты DAG в PythonOperator

import json
import decimal
import airflow
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from tempfile import NamedTemporaryFile
import pymssql  
import logging
import os
# import cloudstorage as gcs
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': False,
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'depends_on_past': False,
    # If a task fails, retry it once after waiting
    # at least 5 minutes
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test_tab1',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    dagrun_timeout=timedelta(minutes=60)
)

try:
    script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
    script_path = "/usr/local/airflow/key/key.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path 

def insert_bigquery(self):
    bigquery_client = bigquery.Client(project="project-name")
    dataset_ref = bigquery_client.dataset('bucket-name')
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    job_config.clustering_fields = ["id"]
    #job_config.field_delimiter = ";"
    uri = "gs://bucket-name/"+filename
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table('tab1'),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')


json_gcs_to_bq = PythonOperator(
    task_id='json_gcs_to_bq',
    python_callable=insert_bigquery,
    provide_context=True,
    dag=dag)

Сообщение об ошибке:

[2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: insert_bigquery() got an unexpected keyword argument 'dag'

1 Ответ

0 голосов
/ 22 июня 2019

Вам не нужно анализировать self в ваш python_callable.Измените параметр вашей функции insert_bigquery как def insert_bigquery (ds, ** kwargs) вместо def insert_bigquery (self) .

Ссылка: https://airflow.apache.org/howto/operator/python.html

...