Beam использует PCollection
, параллельную коллекцию, которую в Python вы можете рассматривать как (обычно) список элементов (обычно кортежей или диктов).
В вашем случае это может быть список "строк", поэтому вы должны
- извлечь ключ строки.Если бы это было
User_id
, тогда отобразите что-то вроде этой лямбды, например
x -> (x[0], x)
Обратите внимание, что x используется в качестве значения в паре k, v, ион также по-прежнему содержит ключ, но это нормально, если вы хотите, вы можете удалить его и перепаковать кортеж значения без него.т.е. этот возвращаемый кортеж будет похож на тип Tuple [str, Tuple [int, int, int, float, float]], предполагая, что это правильные типы User_id, product_id, year, количество, цена
применить окно
сгруппировать по ключу (очень важно определить окно перед группировкой по ключу и знать, что окно вступает в силу только тогда, когда происходит группировка по ключу)
используйте что-то для извлечения интересующих вас столбцов (значений в кортеже), применения агрегатов и перепаковки для всего, что находится ниже по течению.
Itкажется странным использовать функцию агрегирования для одного значения кортежа, но агрегация будет отображаться / применяться ко всей группе ключей в окне.
Этот базовый пример можно легко расширить https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99
Если вам нужно сделать что-то простое, просто сопоставьте функцию, если вам нужно больше, чем просто, вы можете создать DoFn.Это просто.
например, ! Предупреждение, непроверенный код, написанный при передаче!
def multi_agg(element):
(key, row ) = element
return (key, (max(row[3]), min(row[4])))
В этом случае я взял user_id в качестве ключа отпредыдущий шаг, максимальное количество и минимальная цена, затем упаковали его обратно в кортеж из k, v пар.Пара k, v - это кортеж, который является элементом нисходящей коллекции PC.Основная причина, по которой вам нужны пары k, v, заключается в том, что такие вещи, как GroupByKey
, неявно используют первое значение в качестве ключа для группировки.Весь элемент неявно используется в качестве значения для сопоставления с функцией.Эти две вещи не очевидны при рассмотрении примеров Apache Beam.
Вы можете либо перепаковать в пару ak, v для дальнейшей обработки в нисходящем направлении, либо поместить в структуру, готовую для записи, например, в bigquery, bigtable или, возможно,файл в облачном хранилище.В любом случае это отличная идея - использовать подсказки типа.