Проблема с использованием функции .take () с pyspark spark 2+ - PullRequest
0 голосов
/ 19 апреля 2020

Это код, который я использую. Здесь он работает нормально без data.take, но выдает ошибку при использовании его в pyspark python

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
data = sc.textFile("re_u.data")
pData=data.take(2000)
ratings = pData.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

Дает ошибку

AttributeError                            Traceback (most recent call last)
<ipython-input-12-c9c51af1b2e9> in <module>
      2 data = sc.textFile("re_u.data")
      3 pData=data.take(2000)
----> 4 ratings = pData.map(lambda l: l.split(','))\
      5  .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

AttributeError: 'list' object has no attribute 'map'

Обновление: после использования вашего изменения @Hristo Iliev помогло но столкнулся с другой проблемой, за которой следовали рейтинги в виде списка. Спасибо за вашу помощь!

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
data = sc.textFile("re_u.data")
ratings = data.map(lambda l: l.split(','))\
  .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))\
  .take(2000)
rank = 20
numIterations = 20
model = ALS.train(ratings, rank, numIterations)

Дает ошибку

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-24-7e35afff970b> in <module>
      1 rank = 20
      2 numIterations = 20
----> 3 model = ALS.train(ratings, rank, numIterations)

C:\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\mllib\recommendation.py in train(cls, ratings, rank, iterations, lambda_, blocks, nonnegative, seed)
    271           (default: None)
    272         """
--> 273         model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations,
    274                               lambda_, blocks, nonnegative, seed)
    275         return MatrixFactorizationModel(model)

C:\spark\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\mllib\recommendation.py in _prepare(cls, ratings)
    227         else:
    228             raise TypeError("Ratings should be represented by either an RDD or a DataFrame, "
--> 229                             "but got %s." % type(ratings))
    230         first = ratings.first()
    231         if isinstance(first, Rating):

TypeError: Ratings should be represented by either an RDD or a DataFrame, but got <class 'list'>.

Пожалуйста, помогите!

1 Ответ

0 голосов
/ 19 апреля 2020

take() - это действие, которое берет указанные числовые элементы с вершины СДР и передает их в программу драйвера. Из этого вы получите список Python с запрошенными элементами, который:

  • локально для драйвера, поэтому вы не должны брать слишком много элементов
  • не иметь метод map() просто потому, что класс Python list не имеет map() метода

Скорее всего, вы захотите сначала применить преобразования к data RDD и take() из преобразованного СДР:

data = sc.textFile("re_u.data")
ratings = data.map(lambda l: l.split(','))\
  .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))\
  .take(2000)

Вы получите список Rating экземпляров.

Поскольку вы передаете данные далее в ALS, которая принимает распределенные данные, то есть, СДР, а не локальный драйвер list, у вас есть три варианта:

  1. Снова распараллелить список, превратив его в СДР:

    data = sc.textFile("re_u.data")
    ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))\
      .take(2000)
    ratingsRDD = sc.parallelize(ratings)
    rank = 20
    numIterations = 20
    model = ALS.train(ratingsRDD, rank, numIterations)
    
  2. Используйте метод sample() для выборки подмножества данных в СДР:

    data = sc.textFile("re_u.data")
    ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))\
      .sample(False, 0.1, 42)
    rank = 20
    numIterations = 20
    model = ALS.train(ratings, rank, numIterations)
    

    Здесь sample(False, 0.1, 42) означает, что взять приблизительно 10% исходных данных и использовать 42 как семя генератора псевдослучайных чисел. Фиксация семян обеспечит воспроизводимость во время тестирования. Вам следует настроить 0.1 на правильное значение, чтобы получить около 2000 образцов. Обратите внимание, что эти образцы будут взяты из случайных мест внутри СДР и, скорее всего, будут не в первом 2000 году.

  3. Эмулировать take(), оставаясь в пределах области СДР:

    data = sc.textFile("re_u.data")
    ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))\
      .zipWithIndex()\
      .filter(lambda l: l[1] < 2000)\
      .map(lambda l: l[0])
    rank = 20
    numIterations = 20
    model = ALS.train(ratings, rank, numIterations)
    

    zipWithIndex() создает кортежи содержимого RDD, где первый элемент поступает из RDD, а второй - это индекс в RDD (по существу, номер строки). Затем можно отфильтровать только элементы с индексом меньше 2000, а затем избавиться от индекса, используя map(lambda l: l[0]).

Вероятно, наилучшим является метод 2.

...