BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) невероятно полезен для чтения данных из таблицы BigQuery почти в 10 раз быстрее, чем стандартный API BigQuery. Чтобы сделать его еще быстрее, он поддерживает несколько потоков чтения, каждый из которых читает динамическивыделенный набор строк из соответствующей таблицы.
Моя проблема заключается в следующем: хотя вы можете запросить несколько потоков, выделенные потоки после запроса не находятся под вашим контролем. Поэтому я не смогинициировать более 1 потока.
Данные, которые я читаю, состоят из 3 столбцов и 6 миллионов строк, как вы можете видеть ниже. Я печатаю общее количество потоков, созданных для консоли.
from google.cloud import bigquery_storage_v1beta1
project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")
# I request 3 streams to be created!
requested_streams = 3
parent = "projects/{}".format(project_id)
session = client.create_read_session(
table_ref, parent, table_modifiers=modifiers, read_options=read_options,
requested_streams=requested_streams
)
response = client.batch_create_read_session_streams(session, requested_streams)
# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))
reader = client.read_rows(
bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)
rows = reader.rows(session)
names = set()
import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
i += 1
names.add(row["name"])
if i > 6000000:
break
#---------------------------------------------------
end = time.time()
print(end - start)
print("Got {} unique names and {} total rows.".format(len(names), i))
У меня есть несколько вопросов:
1) Я вижу только 1 поток, потому что многопотоковая реализация не завершена (API находится в бета-версии)?
2)Я вижу только 1 поток, потому что данные относительно «малы» для алгоритма распределения потоков?Строки размером уже 6 м.
3) Если бы я начал видеть создание нескольких потоков, документация API не описывает, как читать из них параллельно.Есть мысли о том, как это сделать?