Beam's GroupByKey берет PCollection из 2-х кортежей и возвращает PCollection, где каждый элемент является 2-кортежем ключа и (неупорядоченным) итерацией всех значений, которые были связаны с этим ключом. Например, если ваша оригинальная коллекция имеет элементы
(k1, v1)
(k1, v2)
(k1, v3)
(k2, v4)
, результатом GroupByKey будет PCollection с такими элементами, как
(k1, [v1, v3, v2])
(k2, [v4])
В вашем случае ваши ключи и значения сами по себе кортежи. Таким образом, вы можете взять свою оригинальную коллекцию и применить Map(lambda elt: ((elt['Id'], elt['Date']), (elt['P_id'], elt['position'])))
, который даст вам PCollection с элементами
("abc", 2019-08-01), ("rt56", 5)
("abc", 2019-08-01), ("rt57", 6)
("abc", 2019-08-01), ("rt58", 7)
("abc", 2019-08-02), ("rt56", 2)
("abc", 2019-08-02), ("rt57", 4)
("abc", 2019-08-02), ("rt58", 7)
, который при применении GroupByKey станет
("abc", 2019-08-01), [("rt56", 5), ("rt57", 6), ("rt58", 7)]
("abc", 2019-08-02), [("rt56", 2), ("rt57", 4), ("rt58", 7)]
. Функция compare_pos
может проверять все кортежи P_id, position
, соответствующие заданной паре ID, Date
, и выполнять любые логические операции c, необходимые для выдачи того, что необходимо изменить (с соответствующим ключом).