У меня есть сценарий спарк, который должен сделать 60 вызовов API для каждой строки. В настоящее время я использую BigQuery в качестве хранилища данных. Мне было интересно, есть ли способ использовать BigQuery API или BigQuery Storage API для запроса базы данных из моего udf? Может быть, способ выполнить пакетные запросы? pandas-gbq будет лучшим решением? Каждый запрос, который мне нужно сделать для каждой строки, является запросом select count(*) from dataset.table where {...}
.
В настоящее время я использую клиент большого запроса, как показано в фрагменте кода ниже, но я не уверен, что это лучший способиспользовать мои ресурсы. Извиняюсь, если код не сделан должным образом для этого варианта использования, я новичок в спринге и BigQuery.
def clients():
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/hadoop/credentials.json'
credentials, your_project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# Make clients.
bqclient = bigquery.Client(
credentials=credentials,
project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials
)
return bqclient, bqstorageclient
def query_cache(query):
bqclient, bqstorageclient = clients()
dataframe = (
bqclient.query(query)
.result()
.to_dataframe(bqstorage_client=bqstorageclient)
)
return dataframe['f0_'][0]
@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
query = "select count(*) from dataset.table where ...{some column filters}..."
df['count'] = df.apply(query_cache, args=(query), axis=1)