Как применить агрегатные функции к нескольким столбцам на основе одного ключа в луче Apache? - PullRequest
0 голосов
/ 05 марта 2019

Я использую Apache beam python SDK и работаю над потоком данных GCP.Как применить агрегатные функции к нескольким столбцам на основе одного ключа?Например, набор данных из 10 столбцов, где мои данные выглядят как User_id,product_id,year,quantity,price,... 101,1,2018,10,15,... 101,2,2019,1,10,... 102,1,2019,2,16,...

Для каждого идентификатора пользователя. Как рассчитать количество приобретенных им отдельных товаров, максимум (количество), минимум (цена) и т. Д..

Я видел примеры подсчета слов и т. Д., Где вы можете применить сумму к значениям в паре (ключ, значение).Что делать, если я хочу сделать разные преобразования для разных столбцов, таких как сумма / среднее / число и т. Д.

1 Ответ

0 голосов
/ 05 марта 2019

Beam использует PCollection, параллельную коллекцию, которую в Python вы можете рассматривать как (обычно) список элементов (обычно кортежей или диктов).

В вашем случае это может быть список "строк", поэтому вы должны

  1. извлечь ключ строки.Если бы это было User_id, тогда отобразите что-то вроде этой лямбды, например

x -> (x[0], x)

Обратите внимание, что x используется в качестве значения в паре k, v, ион также по-прежнему содержит ключ, но это нормально, если вы хотите, вы можете удалить его и перепаковать кортеж значения без него.т.е. этот возвращаемый кортеж будет похож на тип Tuple [str, Tuple [int, int, int, float, float]], предполагая, что это правильные типы User_id, product_id, year, количество, цена

применить окно

сгруппировать по ключу (очень важно определить окно перед группировкой по ключу и знать, что окно вступает в силу только тогда, когда происходит группировка по ключу)

используйте что-то для извлечения интересующих вас столбцов (значений в кортеже), применения агрегатов и перепаковки для всего, что находится ниже по течению.

Itкажется странным использовать функцию агрегирования для одного значения кортежа, но агрегация будет отображаться / применяться ко всей группе ключей в окне.

Этот базовый пример можно легко расширить https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99

Если вам нужно сделать что-то простое, просто сопоставьте функцию, если вам нужно больше, чем просто, вы можете создать DoFn.Это просто.

например, ! Предупреждение, непроверенный код, написанный при передаче!

def multi_agg(element):
    (key, row ) = element
    return (key, (max(row[3]), min(row[4])))

В этом случае я взял user_id в качестве ключа отпредыдущий шаг, максимальное количество и минимальная цена, затем упаковали его обратно в кортеж из k, v пар.Пара k, v - это кортеж, который является элементом нисходящей коллекции PC.Основная причина, по которой вам нужны пары k, v, заключается в том, что такие вещи, как GroupByKey, неявно используют первое значение в качестве ключа для группировки.Весь элемент неявно используется в качестве значения для сопоставления с функцией.Эти две вещи не очевидны при рассмотрении примеров Apache Beam.

Вы можете либо перепаковать в пару ak, v для дальнейшей обработки в нисходящем направлении, либо поместить в структуру, готовую для записи, например, в bigquery, bigtable или, возможно,файл в облачном хранилище.В любом случае это отличная идея - использовать подсказки типа.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...