У меня есть рабочий код, но для выполнения задачи, которую мой локальный компьютер может выполнить за ~ 1 минуту, требуется 10 минут.Поэтому я думаю, что мой код нуждается в оптимизации, и я думаю, что я не использую Spark, особенно методы SQL limit()
и collect()
, правильно.
Мне нужно / нужно переместить мою проблему в Spark (pyspark),потому что наши старые инструменты и компьютеры не могут разумно обрабатывать огромное количество созданных файлов (и у них, очевидно, нет ресурсов для обработки некоторых из самых больших файлов, которые мы генерируем).
Я смотрю на файлы CSV и длякаждый файл, т.е. эксперимент, мне нужно знать, какой датчик был запущен первым / последним, и когда эти события произошли.
Сокращен до соответствующего кода Spark. Я делаю
tgl = dataframe.filter("<this line is relevant>") \
.select(\
substring_index(col("Name"),"Sensor <Model> <Revision> ", -1)\
.alias("Sensors"),\
col("Timestamp").astype('bigint'))\
.groupBy("Sensors").agg(min("Timestamp"),max("Timestamp"))
point_in_time = tgl.orderBy("min(Timestamp)", ascending=True).limit(2).collect()
[...]
point_in_time = tgl.orderBy("min(Timestamp)", ascending=False).limit(1).collect()
[...]
point_in_time = tgl.orderBy("max(Timetamp)", ascending=True).limit(1).collect()
[...]
point_in_time = tgl.orderBy("max(Timestamp)", ascending=False).limit(2).collect()
[...]
Я сделал это такКстати, поскольку я где-то читал, что использование .limit()
часто является более разумным выбором, поскольку не все данные будут собираться централизованно, что может занять довольно много времени, памяти и емкости сети.
Я проверяю свойкод с файлом размером 2,5 ГБ и длиной около 3E7 строк.Когда я смотрю на график обработки, я получаю следующее:
Первое, на что стоит обратить внимание, это то, что на каждую задачу Spark уходит 1,1 минуты.Приведенный выше код отвечает за первые 4 проиллюстрированных вызова на collect()
.
Поскольку все четыре вызова используют один и тот же кадр данных, исходящий из filter().select().group().agg()
Я бы подумал, что последующие три вызова будут намного быстрее, чемпервый.Очевидно, Spark не распознает это и каждый раз запускается с исходного кадра данных.Как я могу оптимизировать это, чтобы последующие три вызова на collect()
извлекли выгоду из промежуточных результатов первого вызова на collect()
?