Почему перераспределение не вступает в силу в огромном фрейме данных pyspark? - PullRequest
0 голосов
/ 22 апреля 2019

У меня 10 узлов с 32 ядрами и 125 г каждый.У меня также есть фрейм данных с именем oldEmployee с двумя столбцами employeeName и его зарплатой.

df = ..    
oldEmployee = df.rdd.map(lambda item:....)
    mySchema = StructType([StructField("EmployeeName", StringType(), True),StructField("Salary", DoubleType(), True),])
    oldEmployeeDF = spark.createDataFrame(oldEmployee, schema = mySchema)

Теперь у меня есть UpdateEmployee, созданный следующим образом:

d = df.rdd.flatMap(lambda ....)
mySchema = StructType([StructField("EmployeeName", StringType(), True),StructField("salary", DoubleType(), True),])
    NewEmployeeDF = spark.createDataFrame(d, schema = mySchema)

Теперь я объединяю оба фрейма данных:

NewEmployeeDF = NewEmployeeDF.union(oldEmployeeDF)

Теперь я вычисляю сумму зарплаты для каждого сотрудника:

NewEmployeeDF.registerTempTable("df_table")
salaryDF = spark.sql("SELECT EmployeeName, round(SUM(salary),2) as salary FROM df_table GROUP BY EmployeeName")

Моя проблема заключается в следующем шаге, когда я хочу получить максимальную зарплату.Я делаю так:

maxSalary = salaryDF.agg({"salary": "max"}).collect()[0][0]

Эта строка кода занимает более 6 часов, и даже она не была закончена.В файле журнала я заметил, что после выполнения несколько раз с несколькими параметрами, что всегда номер раздела установлен равным 400, а выполнение достигает раздела 200/400 и останавливается:

19/04/22 06:46:27 INFO TaskSetManager: Finished task 53.0 in stage 37.0 (TID 4288) in 2017303 ms on 172.16.140.175 (executor 41) (199/400)
19/04/22 06:46:37 INFO TaskSetManager: Finished task 192.0 in stage 37.0 (TID 4427) in 2027473 ms on 172.16.140.254 (executor 1) (200/400)

Вы заметили, чтовремя для каждой задачи велико (2027473 ~ 33 минуты), я не понимаю, почему до сих пор

Во-первых, salaryDF действительно огромен, но как я могу решить эту проблему?Почему искровые перегородки salaryDF до 400 не 590, как я определил?Спасибо за ваше предложение

Во-вторых, как вы думаете, лучше использовать coalesce вместо repartition?

Обратите внимание, что переменная conf определяется следующим образом:

conf = (SparkConf()
         #.setMaster("local[*]")
         .setAppName(appName)
         .set("spark.executor.memory", "18g")
         .set('spark.driver.memory', '18g')
         .set('spark.executor.memoryOverhead',"2g")
         .set("spark.network.timeout", "800s")
         #.set("spark.eventLog.enabled", True)
         .set("spark.files.overwrite", "true")
         .set("spark.executor.heartbeatInterval", "20s")
         .set("spark.driver.maxResultSize", "1g")
         .set("spark.executor.instances", 59)
         .set("spark.executor.cores", 5)
         .set("spark.driver.cores", 5)
         .set("spark.default.parallelism", 590)# this takes effect only for RDD you must do repartition for dataframe
         )

установив .set("spark.sql.shuffle.partitions", 590) получаю следующее:

Traceback (most recent call last):
  File "/home/moudi/main.py", line 778, in <module>
    d= maxSalary.agg({"salary": "max"}).collect()[0][0]# the new added
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 466, in collect
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o829.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 37 (collect at /home/tamouze/main.py:778) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 19    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)    at ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...