У меня 10 узлов с 32 ядрами и 125 г каждый.У меня также есть фрейм данных с именем oldEmployee с двумя столбцами employeeName и его зарплатой.
df = ..
oldEmployee = df.rdd.map(lambda item:....)
mySchema = StructType([StructField("EmployeeName", StringType(), True),StructField("Salary", DoubleType(), True),])
oldEmployeeDF = spark.createDataFrame(oldEmployee, schema = mySchema)
Теперь у меня есть UpdateEmployee, созданный следующим образом:
d = df.rdd.flatMap(lambda ....)
mySchema = StructType([StructField("EmployeeName", StringType(), True),StructField("salary", DoubleType(), True),])
NewEmployeeDF = spark.createDataFrame(d, schema = mySchema)
Теперь я объединяю оба фрейма данных:
NewEmployeeDF = NewEmployeeDF.union(oldEmployeeDF)
Теперь я вычисляю сумму зарплаты для каждого сотрудника:
NewEmployeeDF.registerTempTable("df_table")
salaryDF = spark.sql("SELECT EmployeeName, round(SUM(salary),2) as salary FROM df_table GROUP BY EmployeeName")
Моя проблема заключается в следующем шаге, когда я хочу получить максимальную зарплату.Я делаю так:
maxSalary = salaryDF.agg({"salary": "max"}).collect()[0][0]
Эта строка кода занимает более 6 часов, и даже она не была закончена.В файле журнала я заметил, что после выполнения несколько раз с несколькими параметрами, что всегда номер раздела установлен равным 400, а выполнение достигает раздела 200/400 и останавливается:
19/04/22 06:46:27 INFO TaskSetManager: Finished task 53.0 in stage 37.0 (TID 4288) in 2017303 ms on 172.16.140.175 (executor 41) (199/400)
19/04/22 06:46:37 INFO TaskSetManager: Finished task 192.0 in stage 37.0 (TID 4427) in 2027473 ms on 172.16.140.254 (executor 1) (200/400)
Вы заметили, чтовремя для каждой задачи велико (2027473 ~ 33 минуты), я не понимаю, почему до сих пор
Во-первых, salaryDF действительно огромен, но как я могу решить эту проблему?Почему искровые перегородки salaryDF до 400 не 590, как я определил?Спасибо за ваше предложение
Во-вторых, как вы думаете, лучше использовать coalesce
вместо repartition
?
Обратите внимание, что переменная conf определяется следующим образом:
conf = (SparkConf()
#.setMaster("local[*]")
.setAppName(appName)
.set("spark.executor.memory", "18g")
.set('spark.driver.memory', '18g')
.set('spark.executor.memoryOverhead',"2g")
.set("spark.network.timeout", "800s")
#.set("spark.eventLog.enabled", True)
.set("spark.files.overwrite", "true")
.set("spark.executor.heartbeatInterval", "20s")
.set("spark.driver.maxResultSize", "1g")
.set("spark.executor.instances", 59)
.set("spark.executor.cores", 5)
.set("spark.driver.cores", 5)
.set("spark.default.parallelism", 590)# this takes effect only for RDD you must do repartition for dataframe
)
установив .set("spark.sql.shuffle.partitions", 590)
получаю следующее:
Traceback (most recent call last):
File "/home/moudi/main.py", line 778, in <module>
d= maxSalary.agg({"salary": "max"}).collect()[0][0]# the new added
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 466, in collect
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o829.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 37 (collect at /home/tamouze/main.py:778) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 19 at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867) at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863) at ...