Оптимизировать фрагмент кода, который использует действие карты - PullRequest
0 голосов
/ 15 мая 2018

Следующий фрагмент кода занимает много времени на 4 ГБ необработанных данных в кластере:

df.select("type", "user_pk", "item_pk","timestamp")
      .withColumn("date",to_date(from_unixtime($"timestamp")))
      .filter($"date" > "2018-04-14")
      .select("type", "user_pk", "item_pk")
      .map {
        row => {
          val typef = row.get(0).toString
          val user = row.get(1).toString
          val item = row.get(2).toString
          (typef, user, item)
        }
      }

Выход должен быть типа Dataset[(String,String,String)].

Полагаю, что map часть занимает много времени. Есть ли способ оптимизировать этот кусок кода?

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Вы создаете date столбец с типом даты, а затем сравниваете его со строкой ?? Я бы предположил, что какое-то неявное преобразование происходит снизу (для каждой строки при фильтрации).

Вместо этого я преобразовал бы эту строку в дату в метку времени и сделал бы целочисленное сравнение (поскольку вы используете from_unixtime, я предполагаю, что метка времени хранится как System.currenttimemillis или подобный):

timestamp = some_to_timestamp_func("2018-04-14")
df.select("type", "user_pk", "item_pk","timestamp")
  .filter($"timestamp" > timestamp)
... etc
0 голосов
/ 15 мая 2018

Я серьезно сомневаюсь, что проблема map, тем не менее, я бы не стал использовать ее вообще и пошел бы со стандартным Dataset конвертером

import df.sparkSession.implicits._

df.select("type", "user_pk", "item_pk","timestamp")
  .withColumn("date",to_date(from_unixtime($"timestamp")))
  .filter($"date" > "2018-04-14")
  .select($"type" cast "string", $"user_pk" cast "string", $"item_pk" cast "string")
  .as[(String,String,String)]
...