Я хочу подсчитать вхождение каждого поля 'size' в мои данные:
counts = (
lines
| 'convert_to_dict' >> beam.Map(process_data)
| 'Window' >> beam.WindowInto(window.FixedWindows(1*1),accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'GettinyCounty' >> beam.Map(lambda x: x['size'])
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'Sum' >> beam.CombinePerKey(sum))
Печать результатов на консоли
def format_result(size_count):
(size, count) = size_count
out = {}
out["size"] = size
out["count"] = count
print("{}".format(out))
return str(out)
output = counts | 'format_result' >> beam.Map(format_result)
| 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)))
output | beam.io.WriteToPubSub(known_args.output_topic)
Вот что я получу. Я ожидал что-то вроде:
{'medium': 155}, {'small': 75}, ...}