Есть ли способ сгруппировать записи по полям, выполняющим вычисления над другими полями в конвейере потока данных за период? - PullRequest
0 голосов
/ 24 октября 2019

Я пытаюсь создать потоковый конвейер с Dataflow, который читает сообщения из темы PubSub и записывает сгруппированные результаты в таблицу BigQuery. Я не хочу использовать какой-либо шаблон. На данный момент я просто хочу создать конвейер в скрипте Python3, выполняемом из экземпляра виртуальной машины Google, чтобы выполнить процесс преобразования данных, поступающих из Pubsub (структура сообщений соответствует ожидаемой в таблице). В этом процессе я хочу сгруппировать по полям «A» и «B» и рассчитать общее количество вхождений, сумму поля «C» и среднее значение поля «D».

Опубликованные сообщенияв теме PubSub:

{"A":"Alpha", "B":"V1", "C":3, "D":12}
{"A":"Alpha", "B":"V1", "C":5, "D":14}
{"A":"Alpha", "B":"V1", "C":3, "D":22}
{"A":"Beta", "B":"V1", "C":2, "D":6}
{"A":"Beta", "B":"V1", "C":7, "D":19}
{"A":"Beta", "B":"V2", "C":3, "D":10}
{"A":"Beta", "B":"V2", "C":5, "D":12}

Вывод с этими записями должен выглядеть примерно так:

{"A-B":"AlphaV1", "Occurs":3, "sum_C":11, "avg_D":16}
{"A-B":"BetaV1", "Occurs":2, "sum_C":9, "avg_D":12.5}
{"A-B":"BetaV2", "Occurs":2, "sum_C":8, "avg_D":11}

Как определить функцию в Apache Beam для того, чтобы сделатьэтот процесс?

Спасибо!

1 Ответ

3 голосов
/ 24 октября 2019

Вы можете сделать все это с помощью простого GroupByKey и пользовательского агрегирования. Есть один большой вопрос, который вам нужно будет обдумать: как вы хотите, чтобы ваши данные отображались в окне?

Вам нужно окно данных, потому что бегун должен выяснить, когда прекратить ждать больше данныхна том же ключе. Будем рады побольше поговорить об оконном управлении, если вы застряли с этим.

Вот как вы можете выполнить агрегацию, и мы просто "предполагаем" управление окнами:

def compute_keys(elm):
  key = '%s%s' % (elm.get('A'), elm.get('B'))
  return (key, elm)


def perform_aggregations_per_key(key_values):
  key, values = key_values
  values = list(values)  # This will load all values for a single key into memory!
  sum_C = sum(v['C'] for v in values)
  avg_D = sum(v['D'] for v in values) / len(values)
  occurs = len(values)
  return {'A-B': key,
          'Occurs': occurs,
          'sum_C': sum_C,
          'avg_D': avg_D}


my_inputs = (p | ReadFromPubSub(.....))

windowed_inputs = (my_inputs
                   | beam.WindowInto(....))  # You need to window your stream

result = (windowed_inputs
          | beam.Map(compute_keys)
          | beam.GroupByKey()
          | beam.Map(perform_aggregations_per_key))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...