В Spark ML, почему подгонка StringIndexer к столбцу с миллионами различных значений приводит к ошибке OOM? - PullRequest
0 голосов
/ 24 августа 2018

Я пытаюсь использовать функцию Spark StringIndexer для столбца, содержащего около 15 000 000 уникальных строковых значений.Независимо от того, сколько ресурсов я к нему добавляю, Spark всегда умирает от меня с каким-то исключением Out Of Memory.

from pyspark.ml.feature import StringIndexer

data = spark.read.parquet("s3://example/data-raw").select("user", "count")

user_indexer = StringIndexer(inputCol="user", outputCol="user_idx")

indexer_model = user_indexer.fit(data) # This never finishes

indexer_model \
    .transform(data) \
    .write.parquet("s3://example/data-indexed")

Файл драйвера генерируется с ошибкой, начало которой выглядело так:

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2657)

Теперь, если я попытаюсь вручную проиндексировать значения и сохранить их в фрейме данных, все будет работать как на шарме, и все это на нескольких работниках Amazon c3.2xlarge.

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

data = spark.read.parquet("s3://example/data-raw").select("user", "count")

uid_map = data \
    .select("user") \
    .distinct() \
    .select("user", row_number().over(Window.orderBy("user")).alias("user_idx"))

data.join(uid_map, "user", "inner").write.parquet("s3://example/data-indexed")

Мне бы очень хотелось использовать формальные трансформаторы, предоставляемые Spark, но в настоящее время это кажется невозможным.Любые идеи, как я могу сделать эту работу?

1 Ответ

0 голосов
/ 26 марта 2019

Причина, по которой вы получаете ошибку OOM, заключается в том, что за занавесом Spark StringIndexer вызывает countByValue в столбце «пользователь», чтобы получить все различные значения.

С 15M различными значениями вына самом деле создаем огромную Карту на драйвере, и в ней не хватает памяти ... Простым обходным решением было бы увеличение памяти драйвера.Если вы используете spark-submit, вы можете использовать --driver-memory 16g.Вы также можете использовать свойство spark.driver.memory в файле конфигурации.

Тем не менее, проблема просто возникнет снова, так как число различных значений увеличивается.К сожалению, вы мало что можете сделать с трансформаторами Spark, и вот почему.Фактически, после подгонки к данным, трансформаторы должны быть сериализованы для дальнейшего использования.Поэтому они не рассчитаны на такой большой размер (карта с 15M строками весит как минимум 100 МБ).Я думаю, что вам нужно пересмотреть использование StringIndexer для такого количества категорий.Возможно, здесь лучше использовать трюк с хэшированием.

Наконец, позвольте мне прокомментировать ваш обходной путь.С вашим окном вы фактически помещаете все свои 15M категорий в один раздел и, следовательно, в одного исполнителя.Он не будет масштабироваться, если это число увеличится.Кроме того, использование неразделенного окна, как правило, является плохой идеей, поскольку оно предотвращает параллельные вычисления (в дополнение к размещению всего в одном разделе, что может вызвать ошибку OOM).Я бы вычислил ваш uid_map так:

# if you don't need consecutive indices
uid_map = data\
    .select("user")\
    .distinct()\
    .withColumn("user_idx", monotonically_increasing_id())

# if you do, you need to use RDDs
uid_rdd = data\
    .select("user")\
    .distinct()\
    .rdd.map(lambda x : x["user"])\
    .zipWithIndex()
uid_map = spark.createDataFrame(uid_rdd, ["user", "user_idx"])
...