У меня есть искровое задание, которое выполняет чередование наименьших квадратов (ALS) на матрице рейтингов неявной обратной связи. Я создаю объект ALS следующим образом.
val als = new ALS()
.setCheckpointInterval(5)
.setRank(150)
.setAlpha(30.0)
.setMaxIter(25)
.setRegParam(0.001)
.setUserCol("userId")
.setItemCol("itemId")
.setRatingCol("rating")
.setImplicitPrefs(true)
.setIntermediateStorageLevel("MEMORY_ONLY")
.setFinalStorageLevel("MEMORY_ONLY")
Матрица рейтингов создается и используется для соответствия модели ALS следующим образом.
val ratingsSchema = StructType(Array(
StructField("userId", IntegerType, nullable = true),
StructField("itemId", IntegerType, nullable = true),
StructField("rating", DoubleType, nullable = true)))
val ratings = spark
.read
.format("parquet")
.schema(ratingsSchema)
.load("/ratings")
.cache()
val model = als.fit(ratings)
Существует около 150 миллионов уникальных пользователей и 1 миллион элементов в ratings
DataFrame, который содержит около 850 миллионов строк.
Исходя из приведенных выше цифр, рейтинги DataFrame должны занимать ~ 20 ГБ места в памяти при полной загрузке. UserFactors DataFrame будет иметь размер 150 МБ х 150 = 180 ГБ (примерно). Элемент DataFrame itemFactors должен иметь размер всего 1,2 ГБ.
На выполнение задания уходит очень много времени (15+ часов). Мои кластерные характеристики следующие.
Provider: AWS EMR version 5.14.0
Spark version: 2.3.0
Cluster:
1 MASTER node: m4.xlarge (8 cores, 16GB mem, 32GB storage)
2 CORE nodes: i3.xlarge (4 cores, 30GB mem, 950 GB storage)
20 TASK nodes: r4.4xlarge (16 cores, 122GB mem, 32 GB storage)
Total TASK cores = 320
Total TASK memory: 2440 GB
Исходя из приведенных выше цифр, все DF должны легко помещаться в памяти (при необходимости также доступны TB + HDFS).
Конфигурация задания:
--executor-memory 102g
--num-executors 20
--executor-cores 15
Я вижу, что работают 20 исполнителей (и драйвер тоже). Я пытался кэшировать рейтинги DF, а также без кэширования.
Как настроить систему, чтобы она работала быстрее, если это возможно?
Кто-нибудь знает о работе ALS ? Много ли тасует? Как мы можем минимизировать перемешивание?
Матрица рейтингов в формате parquet
с 200 файлами, хранящимися в корзине в S3.
Будет ли это работать лучше, если у меня будет много маленьких экземпляров (скажем, 50) или я получу несколько (скажем, 5) очень больших экземпляров, таких как r4.16xlarge (64 ядра, 488 ГБ памяти)?