Ошибка преобразования строки RDD в фрейм данных с помощью pyspark - PullRequest
0 голосов
/ 07 мая 2018

Я написал функцию, которую хочу применить к фрейму данных, но сначала мне нужно преобразовать фрейм данных в RDD для сопоставления. Затем я печатаю, чтобы увидеть результат:

x = exploded.rdd.map(lambda x: add_final_score(x.toDF()))
print(x.take(2))

Функция add_final_score принимает фрейм данных, поэтому мне нужно преобразовать x обратно в DF, прежде чем я его пропущу. Тем не менее, это дает мне эту ошибку, что toDF нет в списке:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-491-11e7b77ecf3f> in <module>()
     42 #                      StructField('segmentName', StringType(), True)])
     43 # x = exploded.rdd.map(lambda y: y.toDf())
---> 44 print(x.take(2))

~/spark-2.3.0-bin-hadoop2.7/python/pyspark/rdd.py in take(self, num)
   1356 
   1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
   1359 
   1360             items += res

~/spark-2.3.0-bin-hadoop2.7/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    999         # SparkContext#runJob.
   1000         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
   1003 

~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 78.0 failed 1 times, most recent failure: Lost task 0.0 in stage 78.0 (TID 78, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/lisa/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1556, in _getattr_
    idx = self.__fields__.index(item)
ValueError: 'toDF' is not in list

Что это значит? Какой список?

1 Ответ

0 голосов
/ 07 мая 2018

"toDF" работает с DataFrame, как вы можете видеть здесь: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=todf#pyspark.sql.DataFrame.toDF

В вашем коде, я думаю, "взорвался" - это df, поэтому после того, как вы используете ".rdd", он становится rdd. Затем, когда вы используете «карту», ​​вы снова получаете rdd.

Вы не можете применить toDF к строкам rdd. Если вы хотите превратить rdd обратно в DataFrame, вам нужно использовать что-то вроде (в зависимости от вашей версии spark): https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=createdataframe#pyspark.sql.SparkSession.createDataFrame

Кроме того, вы не можете применить функцию к DataFrame, используя «map», потому что rdd не может содержать DataFrames, в которых есть строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...