Я использую кластер EMR 5.18 с использованием типа экземпляра m3.2xlarge. Входные файлы имеют формат .gz, а размер файла не превышает 200 МБ. В кластере было 200 узлов, а общее количество входных файлов составляет около 2500-3k. Задание длится слишком долго, чтобы его выполнить, или иногда оно застревает на последнем этапе.
Во время работы моего приложения pyspark ниже приведены этапы, которые я могу видеть на консоли aws:
ID Status Description Duration stages Tasks
5 Running parquet at NativeMethodAccessorImpl.java:0 15 min 0 / 2 0 / 2,157
4 Succeeded run at ThreadPoolExecutor.java:1149 1 s 1 / 1 1 / 1
3 Succeeded json at NativeMethodAccessorImpl.java:0 15 min 1 / 1 1,957 / 1,957
2 Succeeded text at NativeMethodAccessorImpl.java:0 19 s 1 / 1 2,543 / 2,543
1 Succeeded load at NativeMethodAccessorImpl.java:0 2 s 1 / 1 2 / 2
0 Succeeded load at NativeMethodAccessorImpl.java:0 11 s 1 / 1 1 / 1
Фрагменты кода:
in_files = open("input.txt", "r")
logs = in_files.read().splitlines()
df= spark.read.text(logs)
df_rdd= df.rdd.flatMap(lambda row: function_name(row.value))
df = spark.read.json(df_rdd)
-----
Other transformation like adding removing columns, & perforing a join against csv data set
-----
df.write.parquet(out_path, mode='append', partitionBy='day')
Проблема, с которой я столкнулся, заключается в том, что работа слишком медленная, а самые медленные шаги - № 3 и № 5, упомянутые выше. Так может кто-нибудь помочь мне разобраться с различными шагами, отображаемыми в консоли AWS, и есть ли в моем коде что-нибудь, что я могу улучшить, чтобы эта работа выполнялась быстрее?
Я читал, что * .gz файлы не разделяются, так есть ли способ справиться с этим? Также, когда я случайно проверял некоторые узлы во время выполнения задания, загрузка ЦП работала на уровне 100%.
Пожалуйста, дайте мне знать, если потребуется какая-либо другая информация.