Как сравнить все значения двух ключей в одной PCollection в python? - PullRequest
0 голосов
/ 13 февраля 2020

Я новичок в Apache Луч / поток данных. Я читаю таблицу BigQuery в Apache Beam и хочу сгруппировать по двум разным столбцам и сравнить все значения для двух разных ключей. Я создал кортеж из двух разных столбцов (идентификатор, дата), который действует как ключ. Ниже приведен пример данных в таблице

  ID         Date        P_id    position
  "abc"    2019-08-01   "rt56"      5
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2 
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7

Теперь я хочу сравнить положение P_ids для пары ("ab c", 2019-08-01) и ("ab c). ", 2019-08-02) и посмотрите, будет ли изменена какая-либо позиция P_id, затем добавьте еще один столбец в таблице" status "с True. Моя новая таблица должна выглядеть примерно так:

Я пробую ее с кодом ниже

  ID         Date        P_id    position  Status
  "abc"    2019-08-01   "rt56"      5       False (as this is first date)
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2       True
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7
(
p 
| "get_key_tuple" >> beam.ParDo(lambda element: tuple(element["Id"], element["Date]))
| "group_by" >> beam.GroupByKey()
| "compare_and_add_status" >> beam.ParDo(compare_pos)
)

Но я не знаю, как мне поступить для функции compare_pos ()

Было бы очень полезно получить некоторые идеи о том, как эффективно сравнить позицию и создать новый столбец, чтобы узнать статус, учитывая, что у меня очень большая таблица и много идентификаторов.

Ответы [ 2 ]

0 голосов
/ 14 февраля 2020

Возможно, я неправильно интерпретирую OP, но если предложение @robertwb не сработает, попробуйте, возможно, вместо этого сгруппировать следующее:

| "Create k, v tuple" >> beam.Map(
                    lambda elem: ((elem["P_id"], elem["ID"]), [elem["Date"], elem["position"]]))
| "Group by key" >> beam.GroupByKey()

, которая выведет следующую структуру:

(('rt56', 'abc'), [['2019-08-01', 5], ['2019-08-02', 2]])
(('rt57', 'abc'), [['2019-08-01', 6], ['2019-08-02', 4]])
(('rt58', 'abc'), [['2019-08-01', 7], ['2019-08-02', 7]])

Что должно позволить вам сравнивать каждый элемент в результирующей PCollection отдельно, вместо перекрестного сравнения между элементами в PCollection. Вероятно, это должно лучше соответствовать модели исполнения Beam, если я прав.

Это основано на моем предположении, что вы хотите проверить, изменилась ли позиция для данного P_id между двумя датами.

0 голосов
/ 13 февраля 2020

Beam's GroupByKey берет PCollection из 2-х кортежей и возвращает PCollection, где каждый элемент является 2-кортежем ключа и (неупорядоченным) итерацией всех значений, которые были связаны с этим ключом. Например, если ваша оригинальная коллекция имеет элементы

(k1, v1)
(k1, v2)
(k1, v3)
(k2, v4)

, результатом GroupByKey будет PCollection с такими элементами, как

(k1, [v1, v3, v2])
(k2, [v4])

В вашем случае ваши ключи и значения сами по себе кортежи. Таким образом, вы можете взять свою оригинальную коллекцию и применить Map(lambda elt: ((elt['Id'], elt['Date']), (elt['P_id'], elt['position']))), который даст вам PCollection с элементами

  ("abc", 2019-08-01),   ("rt56", 5)
  ("abc", 2019-08-01),   ("rt57", 6)
  ("abc", 2019-08-01),   ("rt58", 7)
  ("abc", 2019-08-02),   ("rt56", 2)
  ("abc", 2019-08-02),   ("rt57", 4)
  ("abc", 2019-08-02),   ("rt58", 7)

, который при применении GroupByKey станет

  ("abc", 2019-08-01),   [("rt56", 5), ("rt57", 6), ("rt58", 7)]
  ("abc", 2019-08-02),   [("rt56", 2), ("rt57", 4), ("rt58", 7)]

. Функция compare_pos может проверять все кортежи P_id, position, соответствующие заданной паре ID, Date, и выполнять любые логические операции c, необходимые для выдачи того, что необходимо изменить (с соответствующим ключом).

...