Как бороться с медленными заданиями Dataflow, которые используют GroupByKey? - PullRequest
1 голос
/ 06 апреля 2020

У меня довольно большая таблица (в ТБ) в Big Query, и ниже приведен пример данных, которые у меня есть

------------------------------------------------------
|   date       |   id     |   no_items |   unique_id  |
-------------------------------------------------------
|   2018-01-01 |    ifg   |      4     |    AA1       |
|   2018-01-01 |    hui   |      8     |    AA1       |
|   2018-01-01 |    bhi   |      12    |    AA2       |
|   2018-01-02 |    hui   |      3     |    AA1       |
|   2018-01-02 |    ifg   |      10    |    AA1       |
|   2018-01-02 |    bhi   |      5     |    AA2       |
------------------------------------------------------

Я создал задание луча, которое читает и записывает в Avro. Я делаю GroupByKey, используя id и unique_id, а затем сравниваю no_items для каждой из дат. Если разница между no_items между двумя датами превышает 5. Я создаю новую коллекцию PC с новыми полями и одним дополнительным полем, которое принимает предыдущее из no_items. Но задание потока данных занимает много времени, чтобы выполнить его даже с 25 работниками и worker_machine_type как 'n1-highcpu-64'

Ниже представлен конвейер BEAM, который я написал

def sort_grouped_data(element):
    key, value = element
    value = list(value)
    value.sort(key=lambda x: x[2], reverse=True)
    return [element]

def final_segment(element):
    key, value = element
    value = list(value)
    length = len(value)
    for i in range (0, length):
     if abs(value[length - i][1] - value[length - (i + 1)][1]) > 5:
            segment = {"segment_id": segment_id, "segment_start_date": value[length - 1][2],
                       "segment_end_date": value[length - (i + 1)][2], "id": key[1], "unique_id": key[0],
                       "new_value": value[length - i][1]}
    return [segment]


input_or_inputs(
            | "get_key_tuple" >> beam.Map(lambda element: ((element["unique_id"], element["id"]),
                                                           [element["date"], element["no_itmes"]]))
            | "group_by_id_unique_id" >> beam.GroupByKey()
            | "sort_grouped_data" >> beam.ParDo(sort_grouped_data)
            | "final_segment" >> beam.ParDo(final_segment)
)

Мне кажется, что работа с потоком данных идет медленно из-за двух причин

  1. Из-за сортировки
  2. Из-за сравнения каждого из элементов с огромными данными они становятся медленнее.

Может кто-нибудь помочь мне с этим? Можно ли оптимизировать код, чтобы он стал быстрее?

...