Это потому, что events
является PCollection
, поэтому вам необходимо применить к нему PTransform
.
Самый простой способ - применить ParDo
к events
:
events | 'Log results' >> beam.ParDo(LogResults())
который определяется как:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Pub/Sub event: %s", element)
yield element
Обратите внимание, что я также выдаю элемент в случае, если вы хотите применить дальнейшие шаги вниз по течению, такие как запись в приемник после регистрации элементов. См., Например, номер здесь .