Если для конвейера потока данных назначено несколько рабочих, смогут ли они получить доступ к глобальным переменным? - PullRequest
0 голосов
/ 20 июня 2019

В основном я запускаю конвейер в облачном потоке данных Google, используя apache beam python SDK.Во-первых, я читаю строку JSON из облачного pubsub и проверяю по глобальной переменной словаря, были ли данные уже получены с идентификатором.Если это первое сообщение с идентификатором, то я добавляю идентификатор в качестве ключа к своему словарю, а сообщение получаю как значение, в противном случае я не изменяю словарь.По сути, я добавляю ключ в словарь каждый раз, когда получаю новый.Затем я отслеживаю изменения в данных, сравнивая вновь полученные данные с последним чтением.

Не вызовет ли использование общей переменной каких-либо проблем, если число работников, обрабатывающих задание потока данных, станет больше 1?

Способ, которым я сейчас написал, работает, но сейчас только один работник обрабатывает задание потока данных gcp.Я не уверен, возникнут ли какие-либо проблемы, если назначен другой работник.

Здесь я добавил упрощенную версию кода, но в самом коде есть несколько ветвей, проверяющих различные виды событий.

dictionary={}
class AddId2Dict(beam.DoFn):
    def process(self,e):
        if(e[0] not in dictionary.keys()):
            dictionary[e[0]]=e[1]
        return((e,))
class ChangeChecker(beam.DoFn):
    def process(self,e):
       if(e[0] in dictionary.keys()):
            if dictionary[e[0]]<e[1]:
                print 'Increase occurred for id:'+str(e[0])|
                dictionary[e[0]]=e[1]
            elif dictionary[e[0]]>e[1]:
                print 'Decrease occurred for id:'+str(e[0])
            else:
                print 'Stayed constant for id:'+str(e[0])

def run():
    p = beam.Pipeline(options=options)
    (
     p
     | 'read from pubsub'<<beam.io.ReadFromPubSub(topic=topic_name).with_output_type(bytes)
     | 'parse json & create tuple' >> beam.Map(lambda e: ((json.loads(x)['id'],int(json.loads(x)['data'])))
     | 'add key to dict if it does not exist' >> beam.ParDo(AddId2Dict())
     | 'check for event' >> beam.ParDo(ChangeChecker())
    )
    result = p.run()
    result.wait_until_finish()

if __name__  ==  '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

1 Ответ

0 голосов
/ 20 июня 2019

Нет. Каждый работник создаст свою собственную копию переменной dictionary.

Если вы хотите, чтобы работники делили глобальное состояние, вы можете использовать API Beam State. В Python это модуль userstate (https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.transforms.userstate.html).

...