Apache луч CombinePerKey (сумма) не суммируется правильно - PullRequest
0 голосов
/ 01 апреля 2020

Я хочу подсчитать вхождение каждого поля 'size' в мои данные:

counts = (
    lines
    | 'convert_to_dict' >> beam.Map(process_data)
    | 'Window' >> beam.WindowInto(window.FixedWindows(1*1),accumulation_mode=trigger.AccumulationMode.DISCARDING)
    | 'GettinyCounty' >> beam.Map(lambda x: x['size'])
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'Sum' >> beam.CombinePerKey(sum))

Печать результатов на консоли

def format_result(size_count):
  (size, count) = size_count
  out = {}
  out["size"] = size
  out["count"] = count
  print("{}".format(out))
  return  str(out)

output = counts | 'format_result' >> beam.Map(format_result)
            | 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)))

output | beam.io.WriteToPubSub(known_args.output_topic)

Вот что я получу. Я ожидал что-то вроде:

{'medium': 155}, {'small': 75}, ...}

enter image description here

...