У меня есть следующий игрушечный код:
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"
parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)
df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df
Я использовал BALANCED ShardingStrategy, чтобы инициировать более 1 потока, который можно читать независимо.
Документация BigqueryStorage гласит:
Однако, если вы хотите развернуть несколько считывателей, вы можете сделать это, обработав считыватель каждый отдельный поток.
Я инициировал двух читателей, по одному для каждого из потоков в сеансе.После этого два кадра данных (по одному для каждого считывателя) объединяются в один.Однако этот подход не дает никакой скорости по сравнению с LIQUID ShardingStrategy.
Я пытаюсь заставить оба читателя читать строки параллельно.Однако я не смог найти ничего о чтении параллельных потоков в документации библиотеки.
Вопросы:
1) Предоставляет ли BugQuery Storage какие-либо собственные средства для одновременного чтения нескольких потоков, если выбран BALANCED ShardingStrategy?
2) Как лучше всего читатьпоток в параллель?Нужно ли для этого использовать многопроцессорность или asyncio?
3) Буду признателен, если кто-нибудь предоставит какой-нибудь базовый пример для параллельных потоков, перечитывающих