Как напрямую записать результат запроса в корзину Google Cloud Storage? - PullRequest
0 голосов
/ 07 июня 2018
from google.cloud import bigquery  
query = """ select * from emp where emp_name=@emp_name""" 
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
job_config = bigquery.QueryJobConfig() 
job_config.query_parameters = query_params  
client = bigquery.Client() 
query_job = client.query(query, job_config=job_config) 
result = query_job.result()

Как я могу записать результат в Google Cloud Storage вместо того, чтобы записать его в CSV и загрузить его в корзину облачного хранилища?

Ответы [ 4 ]

0 голосов
/ 26 июля 2019
#THIS IS THE CODE I AM RUNNING

# Set the destination table
for i in range(1,13):
table_ref = client.dataset("newdataset").table("chicago_months_increment")
job_config.destination = table_ref
job_config.allow_large_results = True


query_job = client.query('SELECT * FROM `bigquery-public- 
data.chicago_taxi_trips.taxi_trips` WHERE (Select EXTRACT(MONTH from 
trip_start_timestamp) )=i;',
location='US', # Location must match dataset
job_config=job_config)
rows = list(query_job) # Waits for the query to finish


query_job.result() 

# Export table to GCS
destination_uri = "gs://monthly-data/month-"+i+"-*.csv"
dataset_ref = client.dataset("newdataset", project="chicago-project-247714")
table_ref = dataset_ref.table("chicago_months_increment")



extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result()  # Waits for job to complete
client.delete_table(table_ref) #Deletes table in BQ

#ERROR I AM GETTING
---------------------------------------------------------------------------
BadRequest                                Traceback (most recent call last)
<ipython-input-5-e176648eba58> in <module>()
  9     location='US', # Location must match dataset
 10     job_config=job_config)
---> 11     rows = list(query_job) # Waits for the query to finish
 12 
 13 

 /home/amiteshwar/.local/lib/python2.7/site- 

packages / google / cloud / bigquery / job.pyc в iter (self) 2988 2989 def iter (self): -> 2990 return iter (self.result ()) 2991 2992

/home/amiteshwar/.local/lib/python2.7/site- 

packages / google / cloud / bigquery / job.pyc в result (self, timeout, page_size, retry) 2875 Если задание не было выполнено в течение заданного времени ожидания.2876 "" "-> 2877 super (QueryJob, self) .result (timeout = timeout) 2878 # Возвращать итератор вместо возврата задания. 2879, если не self._query_results:

/home/amiteshwar/.local/lib/python2.7/site- 

packages / google/cloud/bigquery/job.pyc в результате (self, timeout, retry) 731 self._begin (retry = retry) 732 # TODO: изменить PollingFuture, чтобы он мог передать аргумент повторения в done (). -> 733 return super(_AsyncJob, self) .result (timeout = timeout) 734 735 def отменено (self):

/home/amiteshwar/.local/lib/python2.7/site- 
packages/google/api_core/future/polling.pyc in result(self, timeout)
125             # pylint: disable=raising-bad-type
126             # Pylint doesn't recognize that this is valid in this case.
--> 127             raise self._exception
128 
129         return self._result

BadRequest: 400 Unrecognized name: i at [1:125]
0 голосов
/ 07 июня 2018

В зависимости от вашего конкретного случая использования (частота экспорта, размер экспорта и т. Д.) Решения, предложенные в ответе @GrahamPolley, могут работать для вас, хотя они потребуют больше внимания и развития.

Текущая возможность для записи результатов запроса заключается либо в том, чтобы записать результаты в таблицу, либо в локальную загрузку, и даже загрузка непосредственно в CSV имеет некоторые ограничения .Следовательно, нет возможности напрямую записывать результаты запроса в GCS в формате CSV.Однако существует двухэтапное решение, состоящее из:

  1. Запись результатов запроса в таблицу BQ
  2. Экспорт данных из таблицы BQ вCSV-файл в GCS .Обратите внимание, что эта функция имеет некоторые ограничения , но они не такие узкие.

Следующий код Python может дать вам представление о том, как выполнить эту задачу:

from google.cloud import bigquery
client = bigquery.Client()

# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
    location='US', # Location must match dataset
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish


# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US')
extract_job.result()  # Waits for job to complete

Обратите внимание, что после этого вам придется удалить таблицу (вы также можете сделать это программно).Это может быть не лучшим решением, если вам нужно автоматизировать процесс (если это ваш вариант использования, возможно, вам следует лучше изучить решения @ Грэма), но это подойдет для простого сценария.

0 голосов
/ 22 ноября 2018

@ dsesto ответ был весьма полезен для меня.Я использовал его код и добавил несколько дополнительных строк, чтобы запросить BigQuery, записать результат в таблицу, затем экспортировать в GCS и импортировать результат в Dask DataFrame.Код обернут в функцию.

def df_from_bq(query:str,table=None,compute=False):

from time import gmtime, strftime
from google.cloud import bigquery#y, storage 
import dask.dataframe as dd
import gcsfs

client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'

table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists

query_job = client.query(
    query,
    location='US', 
    job_config=job_config)
query_job.result() 
print('Query results loaded to table {}'.format(table_ref.path))

destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US') 
extract_job.result() #Extracts results to the GCS

print('Query results extracted to GCS: {}'.format(destination_uri))

client.delete_table(table_ref) #Deletes table in BQ

print('Table {} deleted'.format(table_name))

gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})

#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created

#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')

return df if compute == False else df.compute().reset_index(in_place=True)

Обратите внимание, что таблица в BQ удаляется после импорта результата в облачное хранилище.

0 голосов
/ 07 июня 2018

BigQuery не поддерживает запись результатов запроса непосредственно в GCS.Вам нужно будет записать результаты в таблицу, а затем экспортировать таблицу в GCS после ее материализации.Возможно, вы могли бы использовать Cloud Composer для организации этого для вас.

Или же вы могли бы использовать конвейер потока данных для достижения желаемого результата за один раз.Но это немного больше работы и будет стоить больше денег.Идея состоит в том, чтобы написать конвейер для чтения из BigQuery с использованием вашего SQL-запроса, а затем записать результаты в GCS.Это также будет медленнее.

...