Используя PySpark, я хочу получить максимальное значение среди всех ключей пары RDD.Данные базового СДР имеют вид: Social_Context.take(10):
[(1008044337136001024, 0.9343283582089552),
(1008044334510428160, 3.103463393248575),
(1008044334413852677, 0.7622047244094489),
(1008044333260509185, 0.493006993006993),
(1008044331641593856, 1.6094069529652353),
(1008044329062092801, 0.481981981981982),
(1008044326675460096, 1.3606889564336373),
(1008044325710782469, 0.7228464419475655),
(1008044323370295296, 0.46547314578005117),
(1008044320757354497, 353.8944618599791)]
Тип ключа int
и тип значения float
.Пытаясь получить максимальное значение среди всех клавиш, которые я сделал:
Social_Context_MAX = Social_Context.map(lambda x : x[1]).max()
print(Social_Context_MAX)
Но я получил ошибку:
TypeError: 'float' object has no attribute '__getitem__'
Я такжепытался использовать Spark DataFrames.
Social_Context_MAX = Social_Context.toDF(["id", "value"])
print(Social_Context_MAX.agg({"value": "max"}).collect()[0][0])
Но снова я получил ошибку:
raise TypeError("Unexpected obj type: %s" % type(obj))
TypeError: Unexpected obj type: <type 'float'>
َ Обновление : я прочитал основные данныеиз файла JSON в dataframe и после этого я конвертирую его в RDD.Вот коды более подробно:
raw_data = spark.read.json("../input/Spark_tweets.json")
selected_data = raw_data.select("full_text", "id", "retweet_count", "user", "created_at", "entities")\
.withColumn('verified', udf(getVerified)(raw_data.user))\
.withColumn('followers_count', udf(getFollowerCount)(raw_data.user))\
.withColumn('friends_count', udf(getFriendsCount)(raw_data.user))\
.withColumn("hashtags", udf(getHashtags)(raw_data.entities))\
.drop('user')\
.drop('entities')
Social_Context = selected_data.rdd.map(lambda row : getSocialContext(row))
Social_Context_MAX = Social_Context.map(lambda x : x[1])
print Social_Context_MAX.max(key = lambda x : x[0])
def getSocialContext(row):
A = int(row[2])
B = int(row[5])
C = float(row[6])
if C == 0:
return Default_Social_Context
if (A > 0):
res = (row[1], B * A / C)
return res
else:
res = (row[1], B / C)
return res
Работает хорошо, кроме случаев, когда я пытаюсь получить максимум Social_Context
.