Apache Beam - среднее агрегирование для каждого ключа в PCollection - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть PCollection, которая состоит из столбца ID и семи столбцов значений.Есть несколько строк для каждого идентификатора.

Я хотел бы вычислить среднее значение для семи столбцов для каждого уникального идентификатора.

Есть ли способ достичь этого, не проходя программно через каждый элемент и не создавая пару ключ / значение для каждого элемента?

1 Ответ

0 голосов
/ 13 сентября 2018
table_pcoll = ....

def per_column_average(rows, ignore_elms=[ID_INDEX]):
  return [sum([row[idx] if idx not in ignore_elms else 0 
               for row in rows])/len(row[0]) 
          for idx, _ in enumerate(rows[0])]

keyed_averaged_elm = (table_pcoll 
                      | beam.Map(lambda x: (x[ID_INDEX], x))
                      | beam.GroupByKey()
                      | beam.Map(lambda x: (x[0], per_column_average(rows))

Извините за неприятную однострочность. Я надеюсь, что это помогает.

...