Я пытаюсь использовать BALANCED ShardingStrategy, чтобы получить более 1 потока и многопроцессорную библиотеку Python для параллельного чтения потока.
Однако при параллельном чтении потоков возвращается одинаковое количество строк и данных.Поскольку, если я правильно понимаю, никакие данные не назначаются ни одному потоку до того, как он начнет читать и завершится, поэтому два параллельных потока пытаются прочитать одни и те же данные, и в результате часть данных никогда не будет прочитана.
ИспользованиеС помощью стратегии LIQUID мы можем читать все данные из одного потока, который не может быть разделен.
В соответствии с документацией возможно считывание нескольких потоков параллельно с BALANCED.Тем не менее, я не могу понять, как читать параллельно и назначать разные данные каждому потоку
У меня есть следующий игрушечный код:
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
from multiprocessing import Pool
import multiprocessing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "ethereum_blockchain"
table_ref.table_id = "contracts"
parent = "projects/{}".format(your_project_id)
session = bq_storage_client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
def read_rows(stream_position, session=session):
reader = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]), timeout=100000).to_arrow(session).to_pandas()
return reader
if __name__ == '__main__':
p = Pool(2)
output = p.map(read_rows,([i for i in range(0,2)]))
print(output)
Нужна помощь для чтения нескольких потоков впараллельно.Возможно, есть способ назначить данные потоку до начала чтения.Будем благодарны за любые примеры кода или пояснения и подсказки