Результаты BigQuery для Panda DataFrame в блоках - PullRequest
1 голос
/ 28 февраля 2020

Я пытаюсь сохранить результаты запроса BigQuery в фрейме данных Panda, используя bigquery.Client.query.to_dataframe()

Этот запрос может вернуть миллионы строк.

Учитывая, что Panda в BQ (Dataframe.to_gbq()) имеет параметр чанка, есть ли что-то похожее для BQ на Pandas для постепенного добавления к фрейму данных без необходимости многократного выполнения запроса с ограничением и смещением?

1 Ответ

1 голос
/ 17 марта 2020

Как уже упоминалось @William, вы можете разбивать результаты BigQuery на части и разбивать их на страницы, запрос будет требовать только одного выполнения. Я сделал этот код на основе официальной документации, используя в качестве демонстрации publi c Dataset: 'bigquery-publi c -data.baseball.games_wide':

import pandas as pd
import math

bq_client = bigquery.Client()

class BqToDfChunker(object):


    def __init__(self, query_job, results_per_page):
        bq_result = query_job.result()
        destination = query_job.destination
        destination =  bq_client.get_table(destination)

        self.destination = destination
        self.results_per_page = results_per_page
        self.num_pages = math.ceil(float(destination.num_rows/results_per_page))
        self.index = 0
        self.next_token = None


    def get_next_df_page(self):
        rows = bq_client.list_rows(self.destination,
           max_results = self.results_per_page,
           page_token = self.next_token)

        if self.index < self.num_pages:

            df = pd.DataFrame(rows)
            self.index += 1
            self.next_token = rows.next_page_token 

            return df

        else:
            return None


    def has_next(self):
        if self.index != self.num_pages:
            return True
        else:
            return False

if __name__ == '__main__':

    query = """
        SELECT homeTeamName FROM `bigquery-public-data.baseball.games_wide` group by homeTeamName
    """

    query_job = bq_client.query(query) 

    #initialize the class with the query_job and number_of_results_per_page
    bq_test = BqToDfChunker(query_job, 10)

    while bq_test.has_next():
        print(bq_test.get_next_df_page())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...