Я работаю с большим набором данных на отдельной установке искры.Я все еще новичок, чтобы зажечь (так что мои основы могут быть немного слабыми).Также я надеюсь, что не попадусь на ловушку XY ...
Задача относительно проста в пандах, но я не могу отладить ошибку pyspark.
У меня есть следующеенабор данных.
+-------------+--------+----------+----------+--------------------+
| id|latitude| longitude| timestamp| categoryname|
+-------------+--------+----------+----------+--------------------+
|f69bfce8-a2c5|5.866167|118.088919|1551319828| null|
|b9d48e00-0e57|3.224278| 101.72999|1551445560| CONVENIENCE STORE|
|a6c5d9e2-1f99|3.148319| 101.61653|1551530554| RESTAURANTS|
|92988985-67e2| 1.54056| 110.31867|1551458606| null|
|e1771886-cb87|2.803712|101.663718|1551352028| null|
Используя udf, я смог вычислить расстояние для каждой строки от одной точки, используя библиотеку haversine.
distance_udf = F.udf(lambda lat1, lon1, lat2, lon2: haversine((lat1, lon1), (lat2, lon2)))
Даю мне
+-------------+-----------------+--------+----------+------------------+
| id| categoryname|latitude| longitude| distance|
+-------------+-----------------+--------+----------+------------------+
|f69bfce8-a2c5| null|5.866167|118.088919|1846.2724187047768|
|b9d48e00-0e57|CONVENIENCE STORE|3.224278| 101.72999|10.727485447625341|
|a6c5d9e2-1f99| RESTAURANTS|3.148319| 101.61653| 4.505927571918682|
|92988985-67e2| null| 1.54056| 110.31867| 979.531392507226|
|e1771886-cb87| null|2.803712|101.663718| 40.27783211167852|
+-------------+-----------------+--------+----------+------------------+
После .filter()
и .drop()
У меня осталось
+-------------+--------------------+
| id| categoryname|
+-------------+--------------------+
|d05e2151-0fb9| null|
|8900e7dd-d51e| null|
|a1e712f9-0784|RESIDENTIAL BUILDING|
|5b2c6eb3-f13e| null|
|c7a05929-43fb| RESTAURANTS|
+-------------+--------------------+
Я пытался df.groupby('categoryname').count()
на преобразованномдатафрейм и получаю ошибку
Я пытаюсь получить счетчик каждого имени категории.
Я также пытался преобразовать его в RDD
и пытался использовать .reduceByKey()
безрезультатно.
Чего мне не хватает?Это моя установка?набор данных не такой большой;только 50 Гб.Функции groupby()
работают нормально, когда я впервые загружаю набор данных, но, кажется, не работают после того, как я сделал несколько преобразований.
Может кто-нибудь указать мне правильное направление?
РЕДАКТИРОВАТЬ:
Traceback (most recent call last):
File "C:\Users\Siddharth\Desktop\Uni\DataBooks\Movingwalls\sparkTest.py", line 52, in <module>
results = spark.sql('''SELECT count(DISTINCT idfa), categoryname FROM test2 GROUP BY categoryname''').show()
File "C:\Users\Siddharth\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 378, in show
print(self._jdf.showString(n, 20, vertical))
File "C:\Users\Siddharth\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\Siddharth\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\Siddharth\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o110.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)