Невозможно просмотреть выходные данные для beam.combiners.Count.PerElement () в потоке данных - PullRequest
0 голосов
/ 27 марта 2020

У меня есть сценарий Pub / Sub, публикующий мужские имена следующим образом:

from google.cloud import pubsub_v1
import names

project_id = "Your-Project-Name"
topic_name = "Your-Topic-Name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

while True:
    data = names.get_first_name(gender='male') #u"Message number {}".format(n)
    data = data.encode("utf-8")
    publisher.publish(topic_path, data=data)

Затем у меня есть поток данных, который читает подписку, прикрепленную к topi c, а затем подсчитывает каждый элемент конвейера. следующим образом:

import logging,re,os
import apache_beam as beam
from apache_beam.options.pipeline_options import  PipelineOptions

root = logging.getLogger()
root.setLevel(logging.INFO)

p = beam.Pipeline(options=PipelineOptions())
x = (
 p
 | beam.io.ReadFromPubSub(topic=None, subscription="projects/YOUR-PROJECT-NAME/subscriptions/YOUR-SUBSCRIPTION-NAME").with_output_types(bytes)
 | 'Decode_UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
 | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | 'CountingElem' >> beam.combiners.Count.PerElement()
 | 'FormatOutput' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
 | 'Printing2Log' >> beam.Map(lambda k: logging.info(k)))

result = p.run()
result.wait_until_finish()

Проблема заключается в том, что я не получаю никакого вывода с последних 3 шагов конвейера, в то время как я могу видеть данные, поступающие с первых 3 шагов конвейера - что означает, что ничего не регистрируется .

Я ожидал, что результат будет таким:

Peter: 2
Glen: 1
Alex: 1
Ryan: 2

Я уже благодарю вас за помощь

1 Ответ

1 голос
/ 28 марта 2020

Учитывая, что это потоковый конвейер, вам нужно настроить управление окнами / запуск соответствующим образом, чтобы конвейер работал. Смотрите следующее. https://beam.apache.org/documentation/programming-guide/#windowing

Более конкретно:

Внимание: оконное поведение Beam по умолчанию состоит в назначении всех элементов PCollection одному единственному глобальному окну и поздней отмене данные, даже для неограниченных PCollections. Прежде чем использовать групповое преобразование, например GroupByKey, для неограниченной коллекции PCollection, необходимо выполнить хотя бы одно из следующих действий:

beam.combiners.Count.PerElement() содержит GroupByKey.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...