Мне нужно удалить элементы в конвейере, которые соответствуют старым данным.то есть: данные до 1 января 2019 года. Когда я реализую это, преобразование в нисходящем направлении получает ошибку «NoneType», потому что фильтрация отправляет None, а не просто переходит к следующему элементу.Мне нужно иметь возможность отфильтровывать элементы данных и просто двигаться дальше, ничего не возвращая.
Это для конвейера Apache Beam, обрабатывающего потоковый набор данных из PubSub.Я попытался просто "передать", а также обработать ошибки и передать коллекцию сторон, которая может добавить логику if / else / pass.
def timedelay(element):...
def data_trim_enhance(element):
# Will filter out old sensor data
old_data_limit = datetime(2019, 1, 1, 0, 0, 0, 0)
date_to_compare = datetime.strptime(element.attributes['_last_updt'], '%Y-%m-%d %H:%M:%S.%f')
if date_to_compare < old_data_limit:
# print(date_to_compare)
pass
else:
return "some data"
def run():
chi_traffic = (pipeline
| 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
# [START window_and_trigger] - Set a fixed window at 10 minutes based on evnt_timestamp attribute
| 'TrafficFixedWindows' >> beam.WindowInto(beam.window.FixedWindows(10),
trigger=trigger.AfterWatermark(trigger.AfterCount(10)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
# [END window_and_trigger] - Wait for AfterCount(x) pub/sub messages
| 'GeoEnrich&Trim' >> beam.Map(data_trim_enhance)
| 'TimeDelayEnrich' >> beam.Map(timedelay)
| 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
| 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=pvalue.AsDict(accident_data))
| 'WritetoBQ' >> beam.io.WriteToBigQuery(
table='{0}:{1}.{2}'.format(project_id, dataset_id, table_name),
schema=table_schema
))
После того, как окно сработает, я ожидаю, чтоследующее преобразование вниз (временная задержка), чтобы получить данные, обработанные из data_trim_enhance.Тем не менее, я получаю «Объект AttributeError: NoneType» не имеет атрибута * ».Я пытался справиться с этим с помощью различных функций, но в конечном итоге его просто пинали по дороге, пока он не достиг «WritetoBQ» и не выдавал ошибки на NoneType.Мне нужно просто пропустить старый элемент и ничего не отправлять.