Кодировать столбец с целым числом в pyspark - PullRequest
0 голосов
/ 20 мая 2018

Мне нужно закодировать столбец в большой DataFrame в pyspark (spark 2.0).Все значения практически уникальны (около 1000 млн. Значений).Лучшим выбором может быть StringIndexer, но по какой-то причине он всегда терпит неудачу и убивает мою искровую сессию.Можно ли как-то написать такую ​​функцию:

id_dict() = dict()
def indexer(x):
    id_dict.setdefault(x, len(id_dict))
    return id_dict[x]

И сопоставить ее с DataFrame с помощью id_dict, сохраняющего items ()?Будет ли этот диктат синхронизироваться с каждым исполнителем?Все это мне нужно для предварительной обработки кортежей ('x', 3, 5) для модели ALS spark.mllib.Спасибо.

1 Ответ

0 голосов
/ 20 мая 2018

StringIndexer сохраняет все метки в памяти, поэтому, если значения почти уникальны, он просто не масштабируется.

Вы можете принимать уникальные значения, сортировать и добавлять id, что дорого, но более надежнов этом случае:

from pyspark.sql.functions import monotonically_increasing_id

df = spark.createDataFrame(["a", "b", "c", "a", "d"], "string").toDF("value")

indexer = (df.select("value").distinct()
  .orderBy("value")
  .withColumn("label", monotonically_increasing_id()))

df.join(indexer, ["value"]).show()
# +-----+-----------+
# |value|      label|
# +-----+-----------+
# |    d|25769803776|
# |    c|17179869184|
# |    b| 8589934592|
# |    a|          0|
# |    a|          0|
# +-----+-----------+

Обратите внимание, что метки не являются последовательными и могут отличаться от серии к серии или могут изменяться при изменении spark.sql.shuffle.partitions.Если это не приемлемо, вам придется использовать RDDs:

from operator import itemgetter

indexer = (df.select("value").distinct()
    .rdd.map(itemgetter(0)).zipWithIndex()
    .toDF(["value", "label"]))

df.join(indexer, ["value"]).show()
# +-----+-----+
# |value|label|
# +-----+-----+
# |    d|    0|
# |    c|    1|
# |    b|    2|
# |    a|    3|
# |    a|    3|
# +-----+-----+
...