Инициирование и чтение из нескольких потоков с помощью BigQuery Storage API (бета) - PullRequest
2 голосов
/ 18 мая 2019

BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) невероятно полезен для чтения данных из таблицы BigQuery почти в 10 раз быстрее, чем стандартный API BigQuery. Чтобы сделать его еще быстрее, он поддерживает несколько потоков чтения, каждый из которых читает динамическивыделенный набор строк из соответствующей таблицы.

Моя проблема заключается в следующем: хотя вы можете запросить несколько потоков, выделенные потоки после запроса не находятся под вашим контролем. Поэтому я не смогинициировать более 1 потока.

Данные, которые я читаю, состоят из 3 столбцов и 6 миллионов строк, как вы можете видеть ниже. Я печатаю общее количество потоков, созданных для консоли.

from google.cloud import bigquery_storage_v1beta1

project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")

# I request 3 streams to be created!
requested_streams = 3  

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    table_ref, parent, table_modifiers=modifiers, read_options=read_options, 
    requested_streams=requested_streams
)  

response = client.batch_create_read_session_streams(session, requested_streams)

# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))


reader = client.read_rows(
    bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)

rows = reader.rows(session)

names = set()

import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
    i += 1
    names.add(row["name"])
    if i > 6000000:
        break
#---------------------------------------------------    
end = time.time()
print(end - start)
print("Got {} unique names and {} total rows.".format(len(names), i))

У меня есть несколько вопросов:

1) Я вижу только 1 поток, потому что многопотоковая реализация не завершена (API находится в бета-версии)?

2)Я вижу только 1 поток, потому что данные относительно «малы» для алгоритма распределения потоков?Строки размером уже 6 м.

3) Если бы я начал видеть создание нескольких потоков, документация API не описывает, как читать из них параллельно.Есть мысли о том, как это сделать?

1 Ответ

1 голос
/ 20 мая 2019

Проблема в том, что вы читаете таблицу, имея только один входной файл.Несмотря на то, что в нем 6 миллионов строк, данные легко сжимаются, и поэтому для данных имеется только один файл-столбец с резервной копией.В настоящее время API хранилища не будет разбирать данные более детально, чем этот.

Вы увидите то же самое (есть только один вход), если вы изучите план запроса, который ВЫБИРАЕТСЯ из этой таблицы.

...