BiqQuery Storage.Python.Чтение нескольких потоков в параллельном выпуске (многопроцессорная обработка) - PullRequest
0 голосов
/ 25 сентября 2019

Я пытаюсь использовать BALANCED ShardingStrategy, чтобы получить более 1 потока и многопроцессорную библиотеку Python для параллельного чтения потока.

Однако при параллельном чтении потоков возвращается одинаковое количество строк и данных.Поскольку, если я правильно понимаю, никакие данные не назначаются ни одному потоку до того, как он начнет читать и завершится, поэтому два параллельных потока пытаются прочитать одни и те же данные, и в результате часть данных никогда не будет прочитана.

ИспользованиеС помощью стратегии LIQUID мы можем читать все данные из одного потока, который не может быть разделен.

В соответствии с документацией возможно считывание нескольких потоков параллельно с BALANCED.Тем не менее, я не могу понять, как читать параллельно и назначать разные данные каждому потоку

У меня есть следующий игрушечный код:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
from multiprocessing import Pool
import multiprocessing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "ethereum_blockchain"
table_ref.table_id = "contracts"

parent = "projects/{}".format(your_project_id)
session = bq_storage_client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

def read_rows(stream_position, session=session):
    reader = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]), timeout=100000).to_arrow(session).to_pandas()
    return reader

if __name__ ==  '__main__': 
    p = Pool(2)
    output = p.map(read_rows,([i for i in range(0,2)]))
    print(output)

Нужна помощь для чтения нескольких потоков впараллельно.Возможно, есть способ назначить данные потоку до начала чтения.Будем благодарны за любые примеры кода или пояснения и подсказки

...