Получение ошибки «TypeError: невозможно распаковать не повторяемый объект с плавающей точкой» после исключения сценария в ApacheSpark. Кто-нибудь может отладить мой код? - PullRequest
0 голосов
/ 14 апреля 2020

Привет, я следую за курсом Фрэнка Кейна в apachespark с python. Здесь я пытаюсь рассчитать общую сумму, потраченную различными клиентами. Я упоминал об ошибке ниже. Пожалуйста, помогите. ниже мой код:

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
sc = SparkContext(conf = conf)

def parseline(lines):
    fields=lines.split(',')
    customerId=int(fields[0])
    dollars=float(fields[2])
    return (customerId, dollars)

text = sc.textFile("file:///Sparkcourse/SparkCourse/customer-orders.csv")

rdd= text.map(parseline)
reduction= rdd.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y)
sortedvalues=reduction.sortByKey()
final= sortedvalues.collect()
for i,j in final:
    print(i,j) 

TypeError: невозможно распаковать не повторяемый объект с плавающей точкой

1 Ответ

0 голосов
/ 14 апреля 2020

Я не уверен, что вы хотите сделать, и у меня есть интуиция, что у вас есть хотя бы одна ошибка в вашем коде: вы должны использовать reduceByKey следующим образом:

reduceByKey(lambda x, y: x + y)

Я думаю в вашем случае все шаги, которые вы хотите использовать, могут быть переведены в рамках DataFrame API, что будет проще в использовании. RDDs не самая простая структура для обработки, когда вы используете простые операции в качестве сумм и т. Д. c. (и DataFrames будет быстрее)

Так что я могу предложить вам нечто подобное. Возможно, вам придется изменить оператор schema, чтобы он соответствовал вашей структуре csv. Предполагая, что ваша spark сессия называется spark:

import pyspark.sql.types as pst
import pyspark.sql.functions as psf

schema = pst.StructType([
    pst.StructField("customerId", pst.IntegerType(), True),
    pst.StructField("dollars", pst.IntegerType(), True),
    pst.StructField("productid", pst.IntegerType(), True)])

(spark.read
   .csv("file:///Sparkcourse/SparkCourse/customer-orders.csv", header = False, schema = schema)
   .groupBy('customerId')
   .agg(psf.sum("dollars").alias("dollars"))
   .sortBy('dollars')
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...