Python BigQuery Storage.Чтение нескольких потоков параллельно - PullRequest
0 голосов
/ 24 сентября 2019

У меня есть следующий игрушечный код:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"

parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)

df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df

Я использовал BALANCED ShardingStrategy, чтобы инициировать более 1 потока, который можно читать независимо.

Документация BigqueryStorage гласит:

Однако, если вы хотите развернуть несколько считывателей, вы можете сделать это, обработав считыватель каждый отдельный поток.

Я инициировал двух читателей, по одному для каждого из потоков в сеансе.После этого два кадра данных (по одному для каждого считывателя) объединяются в один.Однако этот подход не дает никакой скорости по сравнению с LIQUID ShardingStrategy.

Я пытаюсь заставить оба читателя читать строки параллельно.Однако я не смог найти ничего о чтении параллельных потоков в документации библиотеки.

Вопросы:

1) Предоставляет ли BugQuery Storage какие-либо собственные средства для одновременного чтения нескольких потоков, если выбран BALANCED ShardingStrategy?

2) Как лучше всего читатьпоток в параллель?Нужно ли для этого использовать многопроцессорность или asyncio?

3) Буду признателен, если кто-нибудь предоставит какой-нибудь базовый пример для параллельных потоков, перечитывающих

Ответы [ 2 ]

1 голос
/ 27 сентября 2019

Я провел некоторое исследование и понял, что вы использовали код из BigQuery Storage API, и вы правы: сбалансированная стратегия используется, если вы потребляете несколько потоков, необходимо отметить, что она все еще включенабета-версия.

Одна из причин, по которой это происходит, заключается в том, что, возможно, вы видите только 1 поток, поскольку данные относительно «малы» для алгоритма распределения потоков, число потоков может быть меньше, чем запрошенное число, в зависимости от 2факторы: разумный параллелизм для таблицы и ограничение сервиса.В настоящее время подробности алгоритма для определения того, что является «разумным», не являются общедоступными и могут измениться, как только API достигнет фазы общей доступности.

Также вы можете попробовать многопроцессорный пакет , который был рекомендован выше.

1 голос
/ 24 сентября 2019

BigQuery Storage API поддерживает несколько потоков, но ваш метод выполнения не поддерживает.Вы можете создать несколько экземпляров считывателей, тогда каждый из них может использовать отдельные потоки для увеличения пропускной способности.

У вас есть много вариантов Параллельная обработка в python .Однако самым простым в использовании является многопроцессорный пакет .

Другой вариант - использовать Apache Beam , который по умолчанию поддерживает параллельную обработку, но может не подходить для вашего варианта использования.Он имеет встроенный драйвер ввода-вывода BigQuery, но его версия на python еще не поддерживает API хранилища BigQuery, поэтому вам, возможно, придется написать собственную реализацию API хранилища BQ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...