Моя подписка pubsub pull отправляет сообщение и значение None для каждого сообщения. Мне нужно найти способ отфильтровать значения none как часть моей конвейерной обработки
Конечно, была бы полезна некоторая помощь, предотвращающая получение значений none из подписки по запросу. Но я чувствую, что что-то упускаю из общего рабочего процесса определения и применения функций через ParDo.
Я настроил функцию для фильтрации ни одного значения, которая, кажется, работает на основе проверки печати на консоль, однако при применении лямбда-функции, которая не работает ни на одном типе, я все еще получаю ошибки.
Я нашел документацию по Python Apache Beam SDK немного скудной, но я искал ответы там без особой удачи.
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def print_row(row):
print row
print type(row)
def filter_out_nones(row):
if row is not None:
yield row
else:
print 'we found a none! get it out'
def run(argv=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
data = ['test1 message','test2 message',None,'test3 please work']
## this does seem to return only the values I would hope for based on the console log
testlogOnly = (p | "makeData" >> beam.Create(data)
| "filter" >> beam.ParDo(filter_out_nones)
| "printtesting" >> beam.Map(print_row))
# | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
# | "filterHere" >> beam.ParDo(filter_out_nones)
# | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
# | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Если бы я мог записывать сообщения в кодировке байтовой строки без результатов none, я был бы там, где мне нужно.