Я новичок в pyspark, у меня есть скрипт, как показано ниже;
joinedRatings=ratings.join(ratings)
joinedRatings.take(4)
и вывод:
[(196, ((242, 3.0), (242, 3.0))), (196, ((242, 3.0), (393, 4.0))), (196, ((242, 3.0), (381, 4.0))), (196, ((242, 3.0), (251, 3.0)))]
После этого у меня есть функция;
def filterDuplicates(userRatings):
ratings = userRatings[1]
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
чем у меня этот СДР
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
Моя проблема в том, чтобы понять, как запустить эту функцию, которую я написал
joinedRatings[1]
Я получил сообщение об ошибке;
Fail to execute line 1: joinedRatings[1]
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-240579357005199320.py", line 380, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
TypeError: 'PipelinedRDD' object does not support indexing
Но он работает под функцией «def filterDuplicates (userRatings):» без каких-либо проблем, пожалуйста, дайте мне знать, как узнать значение «joinRatings [1]»?