Как сохранить полученные извне данные о состоянии в Python Apache-Beam? - PullRequest
0 голосов
/ 05 июня 2019

В моей работе apache-beam я называю внешний источник, GCP Storage, это можно рассматривать как вызов http для универсальных целей, важной частью является то, что это внешний вызов для обогащения работы.

Каждый фрагмент данных, который я обрабатываю, я называю этим API, чтобы получить некоторую информацию для обогащения данных.В API имеется большое количество повторных обращений к одним и тем же данным.

Существует ли хороший способ кэшировать или сохранять результаты для повторного использования для каждого фрагмента данных, обрабатываемого для ограничения объема требуемого сетевого трафика.Это массивное узкое место для обработки.

Ответы [ 2 ]

0 голосов
/ 13 июня 2019

Вы можете рассмотреть сохранение этого значения в качестве состояния экземпляра на вашем DoFn. Например

class MyDoFn(beam.DoFn):
    def __init__(self):
        # This will be called during construction and pickled to the workers.
        self.value1 = some_api_call()

    def setup(self):
        # This will be called once for each DoFn instance (generally
        # once per worker), good for non-pickleable stuff that won't change.
        self.value2 = some_api_call()

    def start_bundle(self):
        # This will be called per-bundle, possibly many times on a worker.
        self.value3 = some_api_call()

    def process(self, element):
        # This is called on each element.
        key = ...
        if key not in self.some_lru_cache:
            self.some_lru_cache[key] = some_api_call()
        value4 = self.some_lru_cache[key]
        # Use self.value1, self.value2, self.value3 and/or value4 here.
0 голосов
/ 05 июня 2019

В Beam нет внутреннего слоя постоянства.Вы должны загрузить данные, которые вы хотите обработать.И это может произойти с парком рабочих, которые все должны иметь доступ к данным.

Однако вы можете рассмотреть возможность доступа к вашим данным в качестве побочного ввода.Вам придется предварительно загрузить все данные и не нужно будет запрашивать внешний источник для каждого элемента: https://beam.apache.org/documentation/programming-guide/#side-inputs

В частности, для GCS вы можете попытаться использовать существующий ввод-вывод, например TextIO: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java

...