Apache Beam: эквивалент DoFn.Setup в Python SDK - PullRequest
0 голосов
/ 28 октября 2018

Каков рекомендуемый способ сделать дорогую одноразовую инициализацию в Beam Python DoFn?В Java SDK есть DoFn.Setup , но в Beam Python нет эквивалента.

В настоящее время лучший способ присоединить объекты к threading.local() в инициализаторе DoFn?

Ответы [ 3 ]

0 голосов
/ 29 января 2019

Поток данных Python не особо прозрачен в отношении оптимального метода инициализации дорогих объектов.Существует несколько механизмов, с помощью которых объекты могут создаваться нечасто (в настоящее время не идеально выполнять именно однократную инициализацию).Ниже приведены некоторые из проведенных мной экспериментов и выводы, к которым я пришел.Надеюсь, кто-то из сообщества Beam поможет мне исправить то, что я заблудился.

__init__

Хотя метод __init__ можно использовать для инициализации дорогого объекта ровно один раз, эта инициализация делаетне бывает на рабочих машинах.Объект должен быть сериализован, чтобы его можно было отправить Рабочему, который для больших объектов, а также для моделей Tensorflow может быть довольно громоздким или вообще не работать.Кроме того, поскольку этот объект будет сериализован и отправлен по проводам, здесь не безопасно выполнять инициализацию, поскольку полезные данные могут быть перехвачены.Рекомендация против использования этого метода.

start_bundle()

Поток данных обрабатывает данные в дискретных группах, которые он называет пакетами.Они достаточно хорошо определены в пакетных процессах, но при потоковой передаче они зависят от пропускной способности.Нет никаких механизмов для настройки того, как Dataflow создает свои пакеты, и фактически размер пакета полностью определяется Dataflow.Метод start_bundle() будет вызываться на рабочем месте и может использоваться для инициализации состояния, однако эксперименты обнаруживают, что в контексте потоковой передачи этот метод вызывается чаще, чем требуется, и дорогостоящие повторные инициализации происходят довольно часто.

Ленивая инициализация

Эта методология была предложена документами Beam и на удивление является наиболее производительной.Ленивая инициализация означает, что вы создаете некоторый параметр с состоянием, который вы инициализируете для None, а затем выполняете код, такой как:

if self.expensive_object is None:
    self.expensive_object = self.__expensive_initialization()

Вы можете выполнить этот код непосредственно в вашем методе process().Вы также можете достаточно легко собрать некоторые вспомогательные функции, которые полагаются на состояние global, чтобы вы могли иметь такие функции, как (пример того, как это может выглядеть, находится внизу этого поста):

self.expensive_object = get_or_initialize_global(‘expensive_object’, self.__expensive_initialization)

Эксперименты

Следующие эксперименты были выполнены для задания, которое было настроено с использованием start_bundle и метода отложенной инициализации, описанного выше, с соответствующей регистрацией для индикации вызова.Различная пропускная способность была опубликована в соответствующей очереди, и результаты были записаны соответственно.

С частотой 1 мсг / с в течение 100 с:

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             100 
LAZY INITIALIZATION                                     25 
TOTAL MESSAGES                                         100 

С частотой 10 мсг / с в течение более100 с

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                             942 
LAZY INITIALIZATION                                      3 
TOTAL MESSAGES                                        1000 

со скоростью 100 мсг / с в течение 100 с

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2447 
LAZY INITIALIZATION                                     30 
TOTAL MESSAGES                                       10000 

со скоростью 1000 мсг / с в течение 100 с

Context                              Number of Invocations 
------------------------------------------------------------ 
NEW BUNDLE                                            2293 
LAZY INITIALIZATION                                     36 
TOTAL MESSAGES                                      100000 

Takeaways

Хотя start_bundle хорошо работает для высокой пропускной способности, ленивая инициализация, тем не менее, наиболее эффективна с большим запасом независимо от пропускной способности.Это рекомендуемый способ выполнения дорогих инициализаций на Python Beam.Этот результат, возможно, не слишком удивителен, если принять во внимание цитату из официальных документов:

Setup - вызывается один раз для экземпляра DoFn, прежде чем что-либо еще;это не было реализовано в Python SDK, поэтому пользователь может обойтись только с помощью отложенной инициализации

Тот факт, что называется «обходом», не особенно обнадеживает, и, возможно, мы можем ожидатьчто-то более надежное в ближайшем будущем.

Примеры кода

Предоставлено Андреасом Янссоном:

def get_or_initialize_global(object_key, initialize_expensive_object):
    if object_key in globals():
        expensive_object = globals()[object_key]
    else:
        expensive_object = initialize_expensive_object()
        globals()[object_key] = expensive_object
0 голосов
/ 05 июля 2019

Установка и демонтаж теперь добавлены в Python SDK и являются рекомендуемым способом выполнения дорогой одноразовой инициализации в Beam Python DoFn.

0 голосов
/ 28 октября 2018
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...