Как отфильтровать нет значений из PCollection - PullRequest
0 голосов
/ 15 мая 2019

Моя подписка pubsub pull отправляет сообщение и значение None для каждого сообщения. Мне нужно найти способ отфильтровать значения none как часть моей конвейерной обработки

Конечно, была бы полезна некоторая помощь, предотвращающая получение значений none из подписки по запросу. Но я чувствую, что что-то упускаю из общего рабочего процесса определения и применения функций через ParDo.

Я настроил функцию для фильтрации ни одного значения, которая, кажется, работает на основе проверки печати на консоль, однако при применении лямбда-функции, которая не работает ни на одном типе, я все еще получаю ошибки.

Я нашел документацию по Python Apache Beam SDK немного скудной, но я искал ответы там без особой удачи.

from __future__ import absolute_import

import argparse
import logging

from past.builtins import unicode

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions



def print_row(row):
    print row
    print type(row)

def filter_out_nones(row):
  if row is not None:
    yield row
  else:
    print 'we found a none! get it out'


def run(argv=None):
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True


    p = beam.Pipeline(options=pipeline_options)



    data = ['test1 message','test2 message',None,'test3 please work']

## this does seem to return only the values I would hope for based on the console log

    testlogOnly = (p | "makeData" >> beam.Create(data)
               | "filter" >> beam.ParDo(filter_out_nones)
               | "printtesting" >> beam.Map(print_row))
            #  | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            #  | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))


##    testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
            #  | "filterHere" >> beam.ParDo(filter_out_nones)
            #   | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
            #   | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            # | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Если бы я мог записывать сообщения в кодировке байтовой строки без результатов none, я был бы там, где мне нужно.

1 Ответ

0 голосов
/ 17 мая 2019

Ваш подход к фильтрации None значений выглядит хорошо для меня.

Однако, если я правильно понимаю, когда вы используете testlogAndWrite и получаете AttributeError, вы сохраняете "printHere" >> beam.Map(print_row) шаг в конвейере.

print_row читает сообщения и печатает их, но ничего не выводит.Следовательно, для следующего шага не будет ввода encode_here.

Чтобы решить эту проблему, вы можете закомментировать этот шаг или убедиться, что возвращен каждый элемент:

def print_row(row):
    print row
    print type(row)
    return row

Вывод:

test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...