Улучшение вычислительной производительности Spark ML ALS - PullRequest
0 голосов
/ 28 июня 2018

У меня есть искровое задание, которое выполняет чередование наименьших квадратов (ALS) на матрице рейтингов неявной обратной связи. Я создаю объект ALS следующим образом.

val als = new ALS()
  .setCheckpointInterval(5)
  .setRank(150)
  .setAlpha(30.0)
  .setMaxIter(25)
  .setRegParam(0.001)
  .setUserCol("userId")
  .setItemCol("itemId")
  .setRatingCol("rating")
  .setImplicitPrefs(true)
  .setIntermediateStorageLevel("MEMORY_ONLY")
  .setFinalStorageLevel("MEMORY_ONLY")

Матрица рейтингов создается и используется для соответствия модели ALS следующим образом.

val ratingsSchema = StructType(Array(
  StructField("userId", IntegerType, nullable = true), 
  StructField("itemId", IntegerType, nullable = true),
  StructField("rating", DoubleType, nullable = true)))

val ratings = spark
  .read
  .format("parquet")
  .schema(ratingsSchema)
  .load("/ratings")
  .cache()

val model = als.fit(ratings)

Существует около 150 миллионов уникальных пользователей и 1 миллион элементов в ratings DataFrame, который содержит около 850 миллионов строк.

Исходя из приведенных выше цифр, рейтинги DataFrame должны занимать ~ 20 ГБ места в памяти при полной загрузке. UserFactors DataFrame будет иметь размер 150 МБ х 150 = 180 ГБ (примерно). Элемент DataFrame itemFactors должен иметь размер всего 1,2 ГБ.

На выполнение задания уходит очень много времени (15+ часов). Мои кластерные характеристики следующие.

Provider: AWS EMR version 5.14.0
Spark version: 2.3.0
Cluster:
  1 MASTER node: m4.xlarge (8 cores, 16GB mem, 32GB storage)
  2 CORE nodes: i3.xlarge (4 cores, 30GB mem, 950 GB storage)
  20 TASK nodes: r4.4xlarge (16 cores, 122GB mem, 32 GB storage)
  Total TASK cores = 320
  Total TASK memory: 2440 GB

Исходя из приведенных выше цифр, все DF должны легко помещаться в памяти (при необходимости также доступны TB + HDFS).

Конфигурация задания:

--executor-memory 102g 
--num-executors 20 
--executor-cores 15

Я вижу, что работают 20 исполнителей (и драйвер тоже). Я пытался кэшировать рейтинги DF, а также без кэширования.

Как настроить систему, чтобы она работала быстрее, если это возможно?

Кто-нибудь знает о работе ALS ? Много ли тасует? Как мы можем минимизировать перемешивание?

Матрица рейтингов в формате parquet с 200 файлами, хранящимися в корзине в S3.

Будет ли это работать лучше, если у меня будет много маленьких экземпляров (скажем, 50) или я получу несколько (скажем, 5) очень больших экземпляров, таких как r4.16xlarge (64 ядра, 488 ГБ памяти)?

...