Как выполнить запрос BigQuery, а затем отправить выходной CSV-файл в облачное хранилище Google в Apache Airflow? - PullRequest
1 голос
/ 18 октября 2019

Мне нужно запустить скрипт BigQuery в Python, который должен быть выведен в виде CSV в облачном хранилище Google. В настоящее время мой сценарий вызывает большой код запроса и сохраняет его непосредственно на моем ПК.

Однако мне нужно запустить его в Airflow, чтобы у меня не было локальных зависимостей.

Мой текущийСценарий сохраняет выходные данные на моем локальном компьютере, а затем я должен переместить его в GCS. Посмотрел онлайн, и я не могу понять это. (ps я очень новичок в python, поэтому заранее извиняюсь, если об этом уже спрашивали!)

import pandas as pd
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

def run_script():

    df = pd.read_gbq('SELECT * FROM `table/veiw` LIMIT 15000',
                 project_id='PROJECT',
                 dialect='standard'
                 )

    df.to_csv('XXX.csv', index=False)

def copy_to_gcs(filename, bucket, destination_filename):

    credentials = GoogleCredentials.get_application_default()
    service = discovery.build('storage', 'v1', credentials=credentials)

    body = {'name': destination_filename}
    req = service.objects().insert(bucket=bucket,body=body, media_body=filename)
    resp = req.execute()

current_date = datetime.date.today()
filename = (r"C:\Users\LOCALDRIVE\ETC\ETC\ETC.csv")
bucket = 'My GCS BUCKET'

str_prefix_datetime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
destfile = 'XXX' + str_prefix_datetime + '.csv'
print('')

    ```

1 Ответ

1 голос
/ 18 октября 2019

Airflow предоставляет несколько операторов для работы с BigQuery.

Вы можете увидеть пример выполнения запроса, за которым следует экспорт результатов в CSV в примерах кода Cloud Composer .

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Query recent StackOverflow questions.

bq_recent_questions_query = bigquery_operator.BigQueryOperator(
    task_id='bq_recent_questions_query',
    sql="""
    SELECT owner_display_name, title, view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
        AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
    ORDER BY view_count DESC
    LIMIT 100
    """.format(max_date=max_query_date, min_date=min_query_date),
    use_legacy_sql=False,
    destination_dataset_table=bq_recent_questions_table_id)

# Export query result to Cloud Storage.
export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_recent_questions_to_gcs',
    source_project_dataset_table=bq_recent_questions_table_id,
    destination_cloud_storage_uris=[output_file],
    export_format='CSV')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...