Как прочитать файл с помощью pyspark и преобразовать его в фрейм данных? - PullRequest
0 голосов
/ 29 января 2020

Я очень новичок в spark, и я установил pyspark на linux машине.

У меня есть файл myFile, который выглядит следующим образом:

head -n10 myFile

5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37716707876914,-71.06993765543464,17.422535604703803,1525319638,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37746950289335,-71.06982208027628,17.683572759745804,1525319639,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.377572688752046,-71.06941706906532,22.2879345491413,1525319640,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37760741715237,-71.06942602861176,19.110023956558077,1525319641,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37724319426698,-71.06952117217712,18.904772942860852,1525319642,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37825292800291,-71.06947776768372,20.772903906241773,1525319643,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.378007902513026,-71.06983132150934,18.084947927771996,1525319644,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37869100406859,-71.07032634455157,15.64325967958824,1525319645,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.37872266154198,-71.07033504770575,21.09347764172144,1525319646,36.85362170935733
5ac52674ffff34c987885b6bf1812df2b728c3089adf113c8b5f143277a39498,IDFA,1,42.378682268943635,-71.07034500227158,21.851893849658023,1525319647,36.85362170935733

Я пытаюсь прочитать его и преобразовать в фрейм данных: это то, что я пытаюсь сделать.

from pyspark.sql.types import *

spark  = SparkSession.builder\
                  .master("local")\
                  .enableHiveSupport()\
                  .getOrCreate()

spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext

f3 = sc.textFile("myFile")

schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Code", StringType(), True),
    StructField("bool", IntegerType(), True),
    StructField("lat", FloatType(), True),
    StructField("lon", FloatType(), True),
    StructField("v1", FloatType(), True),
    StructField("v2", IntegerType(), True),
    StructField("v3", FloatType(), True)])
df3 = spark.createDataFrame(f3, schema)

, но если я пытаюсь преобразовать его

df = df3.toPandas().set_index('name').T.to_dict('list')

, я получаю следующую ошибку:

Py4JJavaError: An error occurred while calling o47.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/.sdkman/candidates/spark/2.4.4/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/.sdkman/candidates/spark/2.4.4/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

1 Ответ

1 голос
/ 29 января 2020

Считать файл как

>>> df=spark.read.format("csv").schema(schema).load("file path")
>>> df.show()
+--------------------+----+----+---------+----------+---------+----------+---------+
|                  ID|Code|bool|      lat|       lon|       v1|        v2|       v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA|   1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
+--------------------+----+----+---------+----------+---------+----------+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...