Я новичок ie для блоков данных и свечей. Я использовал редакцию Data Bricks и кластер Spark 2.4.5. Я пытался изменить код для запуска с Spark 1.6.2 на Spark 2.4.5, поскольку в версии сообщества не разрешено создавать кластер с использованием Spark 1.6.2. Может кто-нибудь помочь мне преобразовать объект RDD в Dataframe в Spark 2.4.5?
КОД
summary = data.select("OrderMonthYear", "SaleAmount").groupBy("OrderMonthYear").sum().orderBy("OrderMonthYear") #.toDF("OrderMonthYear","SaleAmount")
# Convert OrderMonthYear to integer type
#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])
rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
#rddData1 = rddData.flatMap(lambda x : [(k, x) for k in x.keys()])
# assuming the spark environemnt is set and sc is spark.sparkContext
schemaPeople = spark.createDataFrame(rddData)
Код, который работал в Spark 1.6.2, показан ниже
#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])
Код, который я изменил и который не работает следующим образом
rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
schemaPeople = spark.createDataFrame(rddData)
При попытке преобразовать RDD в DataFrame в следующей строке * возникает ошибка 1023 *
schemaPeople = spark.createDataFrame(rddData)
ОШИБКА
Спасибо
Полная ошибка детализации
SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 472, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 472, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:57)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2890)
at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2881)
at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2880)
at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3492)
at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3487)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:242)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:172)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3487)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2880)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:149)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:54)
at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:984)
at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:931)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:931)
at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:492)
at com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:918)
at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:364)
at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:351)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:351)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 472, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more