Оптимизировать использование collect () - PullRequest
0 голосов
/ 19 сентября 2018

У меня есть рабочий код, но для выполнения задачи, которую мой локальный компьютер может выполнить за ~ 1 минуту, требуется 10 минут.Поэтому я думаю, что мой код нуждается в оптимизации, и я думаю, что я не использую Spark, особенно методы SQL limit() и collect(), правильно.

Мне нужно / нужно переместить мою проблему в Spark (pyspark),потому что наши старые инструменты и компьютеры не могут разумно обрабатывать огромное количество созданных файлов (и у них, очевидно, нет ресурсов для обработки некоторых из самых больших файлов, которые мы генерируем).

Я смотрю на файлы CSV и длякаждый файл, т.е. эксперимент, мне нужно знать, какой датчик был запущен первым / последним, и когда эти события произошли.

Сокращен до соответствующего кода Spark. Я делаю

tgl = dataframe.filter("<this line is relevant>") \
        .select(\
            substring_index(col("Name"),"Sensor <Model> <Revision> ", -1)\
                .alias("Sensors"),\
            col("Timestamp").astype('bigint'))\
        .groupBy("Sensors").agg(min("Timestamp"),max("Timestamp"))

point_in_time = tgl.orderBy("min(Timestamp)", ascending=True).limit(2).collect()
[...]
point_in_time = tgl.orderBy("min(Timestamp)", ascending=False).limit(1).collect()
[...]
point_in_time = tgl.orderBy("max(Timetamp)", ascending=True).limit(1).collect()
[...]
point_in_time = tgl.orderBy("max(Timestamp)", ascending=False).limit(2).collect()
[...]

Я сделал это такКстати, поскольку я где-то читал, что использование .limit() часто является более разумным выбором, поскольку не все данные будут собираться централизованно, что может занять довольно много времени, памяти и емкости сети.

Я проверяю свойкод с файлом размером 2,5 ГБ и длиной около 3E7 строк.Когда я смотрю на график обработки, я получаю следующее: Timeline of exemplary test run

Первое, на что стоит обратить внимание, это то, что на каждую задачу Spark уходит 1,1 минуты.Приведенный выше код отвечает за первые 4 проиллюстрированных вызова на collect().

Поскольку все четыре вызова используют один и тот же кадр данных, исходящий из filter().select().group().agg() Я бы подумал, что последующие три вызова будут намного быстрее, чемпервый.Очевидно, Spark не распознает это и каждый раз запускается с исходного кадра данных.Как я могу оптимизировать это, чтобы последующие три вызова на collect() извлекли выгоду из промежуточных результатов первого вызова на collect()?

1 Ответ

0 голосов
/ 20 сентября 2018

Ваше наблюдение о том, что искра повторно выполняет DAG каждый раз, верно, оно вытекает из очень простого факта, что искра ленива и что у искры есть 2 типа операций:

  1. преобразования: выберите,filter, groupBy, orderBy, withColumn и т. д., которые описывают, как преобразуется Dataframe / Dataset и вносит вклад в действия DAG
  2. : запись, сбор, подсчет и т. д., которые вызывают выполнение DAG

Фреймы данных не содержат данных, они являются своего рода виртуальным представлением, которое описывает преобразования входных данных.Один из способов не вызвать повторное выполнение DAG с каждым collect - это кэшировать tgl

tgl = dataframe.filter("<this line is relevant>") \
    .select(\
        substring_index(col("Name"),"Sensor <Model> <Revision> ", -1)\
            .alias("Sensors"),\
        col("Timestamp").astype('bigint'))\
    .groupBy("Sensors").agg(min("Timestamp"),max("Timestamp"))

 tgl.persist()

 point_in_time = tgl.orderBy("min(Timestamp)", ascending=True).limit(2).collect()
 [...]
 point_in_time = tgl.orderBy("min(Timestamp)", ascending=False).limit(1).collect()
 [...]
 point_in_time = tgl.orderBy("max(Timetamp)", ascending=True).limit(1).collect()
 [...]
 point_in_time = tgl.orderBy("max(Timestamp)", ascending=False).limit(2).collect()
 [...]

Это предотвратит повторное выполнение DAG, ноза кэширование tgl в ОЗУ будет установлена ​​цена, которая может свести на нет преимущества операции limit .Насколько это влияние, покажет только эксперимент.

В качестве альтернативы, если вы определите, на какие вопросы вы хотите получить ответ от своей программы, я мог бы попытаться помочь вам написать конкретный запрос или программу, на которую нужно ответить за один раз.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...