Как отфильтровать элементы от нисходящих преобразований в конвейере, не отправляя NoneType? - PullRequest
0 голосов
/ 11 апреля 2019

Мне нужно удалить элементы в конвейере, которые соответствуют старым данным.то есть: данные до 1 января 2019 года. Когда я реализую это, преобразование в нисходящем направлении получает ошибку «NoneType», потому что фильтрация отправляет None, а не просто переходит к следующему элементу.Мне нужно иметь возможность отфильтровывать элементы данных и просто двигаться дальше, ничего не возвращая.

Это для конвейера Apache Beam, обрабатывающего потоковый набор данных из PubSub.Я попытался просто "передать", а также обработать ошибки и передать коллекцию сторон, которая может добавить логику if / else / pass.

def timedelay(element):...

def data_trim_enhance(element):
    # Will filter out old sensor data 
    old_data_limit = datetime(2019, 1, 1, 0, 0, 0, 0)
    date_to_compare = datetime.strptime(element.attributes['_last_updt'], '%Y-%m-%d %H:%M:%S.%f')

    if date_to_compare < old_data_limit:
        # print(date_to_compare)
        pass
    else:
        return "some data"

def run():

    chi_traffic = (pipeline
                   | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
                   # [START window_and_trigger] - Set a fixed window at 10 minutes based on evnt_timestamp attribute
                   | 'TrafficFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(10),
                                                              trigger=trigger.AfterWatermark(trigger.AfterCount(10)),
                                                              accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
                   # [END window_and_trigger] - Wait for AfterCount(x) pub/sub messages
                   | 'GeoEnrich&Trim' >> beam.Map(data_trim_enhance)
                   | 'TimeDelayEnrich' >> beam.Map(timedelay)
                   | 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
                   | 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=pvalue.AsDict(accident_data))
                   | 'WritetoBQ' >> beam.io.WriteToBigQuery(
                                                        table='{0}:{1}.{2}'.format(project_id, dataset_id, table_name),
                                                        schema=table_schema
                                                            ))

После того, как окно сработает, я ожидаю, чтоследующее преобразование вниз (временная задержка), чтобы получить данные, обработанные из data_trim_enhance.Тем не менее, я получаю «Объект AttributeError: NoneType» не имеет атрибута * ».Я пытался справиться с этим с помощью различных функций, но в конечном итоге его просто пинали по дороге, пока он не достиг «WritetoBQ» и не выдавал ошибки на NoneType.Мне нужно просто пропустить старый элемент и ничего не отправлять.

1 Ответ

1 голос
/ 12 апреля 2019

После дальнейших исследований я выяснил проблему. Суть проблемы в основном основана на сравнении Map и FlatMap.

  • Карта принимает один вход и ДОЛЖНА выдавать один выход.
  • FlatMap, однако, может принимать один выход, но возвращает 0 или больше элементов . Это означает, что он не помечает запись с NoneType в возврате. Следовательно, основная проблема была решена.

В своем коде я использовал преобразование Map для data_trim_enhance, когда это должен был быть FlatMap, поскольку я использую эту функцию для обрезки / фильтрации данных из источника и полностью отбрасываю их.

Я пробовал FlatMap раньше, но это искажало мой словарь. Чтобы решить эту проблему, я просто приложил словарь с обозначением списка.

Надеюсь, это кому-нибудь поможет!

...