У меня довольно большая таблица (в ТБ) в Big Query, и ниже приведен пример данных, которые у меня есть
------------------------------------------------------
| date | id | no_items | unique_id |
-------------------------------------------------------
| 2018-01-01 | ifg | 4 | AA1 |
| 2018-01-01 | hui | 8 | AA1 |
| 2018-01-01 | bhi | 12 | AA2 |
| 2018-01-02 | hui | 3 | AA1 |
| 2018-01-02 | ifg | 10 | AA1 |
| 2018-01-02 | bhi | 5 | AA2 |
------------------------------------------------------
Я создал задание луча, которое читает и записывает в Avro. Я делаю GroupByKey, используя id и unique_id, а затем сравниваю no_items для каждой из дат. Если разница между no_items между двумя датами превышает 5. Я создаю новую коллекцию PC с новыми полями и одним дополнительным полем, которое принимает предыдущее из no_items. Но задание потока данных занимает много времени, чтобы выполнить его даже с 25 работниками и worker_machine_type как 'n1-highcpu-64'
Ниже представлен конвейер BEAM, который я написал
def sort_grouped_data(element):
key, value = element
value = list(value)
value.sort(key=lambda x: x[2], reverse=True)
return [element]
def final_segment(element):
key, value = element
value = list(value)
length = len(value)
for i in range (0, length):
if abs(value[length - i][1] - value[length - (i + 1)][1]) > 5:
segment = {"segment_id": segment_id, "segment_start_date": value[length - 1][2],
"segment_end_date": value[length - (i + 1)][2], "id": key[1], "unique_id": key[0],
"new_value": value[length - i][1]}
return [segment]
input_or_inputs(
| "get_key_tuple" >> beam.Map(lambda element: ((element["unique_id"], element["id"]),
[element["date"], element["no_itmes"]]))
| "group_by_id_unique_id" >> beam.GroupByKey()
| "sort_grouped_data" >> beam.ParDo(sort_grouped_data)
| "final_segment" >> beam.ParDo(final_segment)
)
Мне кажется, что работа с потоком данных идет медленно из-за двух причин
- Из-за сортировки
- Из-за сравнения каждого из элементов с огромными данными они становятся медленнее.
Может кто-нибудь помочь мне с этим? Можно ли оптимизировать код, чтобы он стал быстрее?