Причина, по которой вы получаете ошибку OOM, заключается в том, что за занавесом Spark StringIndexer
вызывает countByValue
в столбце «пользователь», чтобы получить все различные значения.
С 15M различными значениями вына самом деле создаем огромную Карту на драйвере, и в ней не хватает памяти ... Простым обходным решением было бы увеличение памяти драйвера.Если вы используете spark-submit, вы можете использовать --driver-memory 16g
.Вы также можете использовать свойство spark.driver.memory
в файле конфигурации.
Тем не менее, проблема просто возникнет снова, так как число различных значений увеличивается.К сожалению, вы мало что можете сделать с трансформаторами Spark, и вот почему.Фактически, после подгонки к данным, трансформаторы должны быть сериализованы для дальнейшего использования.Поэтому они не рассчитаны на такой большой размер (карта с 15M строками весит как минимум 100 МБ).Я думаю, что вам нужно пересмотреть использование StringIndexer для такого количества категорий.Возможно, здесь лучше использовать трюк с хэшированием.
Наконец, позвольте мне прокомментировать ваш обходной путь.С вашим окном вы фактически помещаете все свои 15M категорий в один раздел и, следовательно, в одного исполнителя.Он не будет масштабироваться, если это число увеличится.Кроме того, использование неразделенного окна, как правило, является плохой идеей, поскольку оно предотвращает параллельные вычисления (в дополнение к размещению всего в одном разделе, что может вызвать ошибку OOM).Я бы вычислил ваш uid_map
так:
# if you don't need consecutive indices
uid_map = data\
.select("user")\
.distinct()\
.withColumn("user_idx", monotonically_increasing_id())
# if you do, you need to use RDDs
uid_rdd = data\
.select("user")\
.distinct()\
.rdd.map(lambda x : x["user"])\
.zipWithIndex()
uid_map = spark.createDataFrame(uid_rdd, ["user", "user_idx"])