Использование start_bundle () в работе Apache-Beam не работает.Непередаваемое хранилище. Клиент () - PullRequest
0 голосов
/ 24 мая 2018

Я получаю эту ошибку

pickle.PicklingError: Выборка клиентских объектов явно не поддерживается.Клиенты имеют нетривиальное состояние, которое является локальным и недоступным для выбора.

При попытке использовать beam.ParDo для вызова функции, которая выглядит следующим образом

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.storageClient = storage.Client()

    def process(self, element):
        client = self.storageClient
        bucket = client.get_bucket(element)
        blobs = list(bucket.list_blobs(max_results=100))
        return blobs

Я думал, что весь смыслиз start_bundle должен был инициализировать self.someProperty, а затем использовать этот self.someProperty в методе 'process', чтобы избавиться от проблемы травления (из источников ниже). Может ли кто-нибудь указать мне правильное направление, как решить эту проблему?

[+] Что я прочитал:

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

Как устранить ошибку засоления в классе apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum

1 Ответ

0 голосов
/ 25 мая 2018

ОБНОВЛЕНО: проблема была на самом деле проблема библиотеки.У меня должна была быть правильная версия apache-beam SDK с правильными версиями Google-Cloud:

gapic-google-cloud-pubsub-v1 == 0.15.4

gax-google-logging-v2 == 0.8.3

gax-google-pubsub-v1 == 0.8.3

google-api-core == 1.1.2 google-api-python-client ==1.6.7

google-apitools == 0.5.10

google-auth == 1.4.1

google-auth-httplib2 == 0.0.3

google-cloud-bigquery == 1.1.0

google-cloud-core == 0.28.1

google-cloud-datastore == 1.6.0

google-cloud-pubsub == 0.26.0

google-cloud-storage == 1.10.0

google-gax == 0.12.5

apache-beam == 2.3.0

Я смог решить эту проблему с помощью комбинации вещей, во-первых, я ничего не сериализовал (уродливый один лайнер в выходной) а второй использует threading.local ()

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.threadLocal = threading.local()
        self.threadLocal.client = storage.Client()

    def process(self, element):
        yield list(storage.Client().get_bucket(element).list_blobs(max_results=100))
...