У меня есть примерно 100 ГБ данных, которые я пытаюсь обработать. Данные имеют вид:
| timestamp | social_profile_id | json_payload |
|-----------|-------------------|------------------|
| 123 | 1 | {"json":"stuff"} |
| 124 | 2 | {"json":"stuff"} |
| 125 | 3 | {"json":"stuff"} |
Я пытаюсь разбить этот фрейм данных на папки в S3
на social_profile_id
. Есть примерно 430 000 social_profile_id
с.
Я загрузил данные без проблем в набор данных. Однако, когда я пишу это, и пытаюсь разделить это, это занимает вечность! Вот что я попробовал:
messagesDS
.write
.partitionBy("socialProfileId")
.mode(sparkSaveMode)
Мне все равно, сколько файлов в каждой папке в конце работы. Моя теория состоит в том, что каждый узел может группироваться по social_profile_id
, а затем записывать в свою соответствующую папку без необходимости перемешивания или связи с другими узлами. Но этого не происходит, о чем свидетельствует долгое время работы. В идеале конечный результат должен выглядеть примерно так:
├── social_id_1 (only two partitions had id_1 data)
| ├── partition1_data.parquet
| └── partition3_data.parquet
├── social_id_2 (more partitions have this data in it)
| ├── partition3_data.parquet
| └── partition4_data.parquet
| ├── etc.
├── social_id_3
| ├── partition2_data.parquet
| └── partition4_data.parquet
| ├── etc.
├── etc.
Я пытался увеличить вычислительные ресурсы в несколько раз, увеличивая размеры экземпляров и количество экземпляров. То, что я смог увидеть из искрового пользовательского интерфейса, которое большую часть времени занимает операция записи. Кажется, что все исполнители используются, но они занимают абсурдно много времени, чтобы выполнить (например, 3-5 часов, чтобы написать ~ 150Mb). Любая помощь будет признательна! Извините, если я перепутал некоторые термины искры.