API хранилища BigQuery: наилучшая практика использования клиента из UDF Spark Pandas? - PullRequest
1 голос
/ 11 декабря 2019

У меня есть сценарий спарк, который должен сделать 60 вызовов API для каждой строки. В настоящее время я использую BigQuery в качестве хранилища данных. Мне было интересно, есть ли способ использовать BigQuery API или BigQuery Storage API для запроса базы данных из моего udf? Может быть, способ выполнить пакетные запросы? pandas-gbq будет лучшим решением? Каждый запрос, который мне нужно сделать для каждой строки, является запросом select count(*) from dataset.table where {...}.

В настоящее время я использую клиент большого запроса, как показано в фрагменте кода ниже, но я не уверен, что это лучший способиспользовать мои ресурсы. Извиняюсь, если код не сделан должным образом для этого варианта использования, я новичок в спринге и BigQuery.

def clients():
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/hadoop/credentials.json'
    credentials, your_project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )

    # Make clients.
    bqclient = bigquery.Client(
        credentials=credentials,
        project=your_project_id,
    )
    bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
        credentials=credentials
    )
    return bqclient, bqstorageclient

def query_cache(query):
    bqclient, bqstorageclient = clients()

        dataframe = (
            bqclient.query(query)
                .result()
                .to_dataframe(bqstorage_client=bqstorageclient)
        )
        return dataframe['f0_'][0]

@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
    query = "select count(*) from dataset.table where ...{some column filters}..."
    df['count'] = df.apply(query_cache, args=(query), axis=1)

1 Ответ

0 голосов
/ 11 декабря 2019

Более простой вариант - использовать spark-bigquery-connector , который позволяет напрямую запрашивать BigQuery и получать результат в виде фрейма данных Spark. Преобразование этого кадра данных в панды тогда просто:

spark_df = spark.read.format('bigquery').option('table', table).load()
pandas_df = spark_df.toPandas()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...