Как конвертировать RDD в Dataframe Spark 2.4.5 Python - PullRequest
0 голосов
/ 01 мая 2020



Я новичок ie для блоков данных и свечей. Я использовал редакцию Data Bricks и кластер Spark 2.4.5. Я пытался изменить код для запуска с Spark 1.6.2 на Spark 2.4.5, поскольку в версии сообщества не разрешено создавать кластер с использованием Spark 1.6.2. Может кто-нибудь помочь мне преобразовать объект RDD в Dataframe в Spark 2.4.5?


КОД

summary = data.select("OrderMonthYear", "SaleAmount").groupBy("OrderMonthYear").sum().orderBy("OrderMonthYear") #.toDF("OrderMonthYear","SaleAmount")
# Convert OrderMonthYear to integer type
#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])
rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
#rddData1 = rddData.flatMap(lambda x : [(k, x) for k in x.keys()])

# assuming the spark environemnt is set and sc is spark.sparkContext 
schemaPeople = spark.createDataFrame(rddData)

Код, который работал в Spark 1.6.2, показан ниже

#results = summary.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount)).toDF(["OrderMonthYear","SaleAmount"])

Код, который я изменил и который не работает следующим образом

rddData = summary.rdd.map(lambda r: (int(r.OrderMonthYear.replace('-','')), r.SaleAmount))
schemaPeople = spark.createDataFrame(rddData) 


При попытке преобразовать RDD в DataFrame в следующей строке * возникает ошибка 1023 *

schemaPeople = spark.createDataFrame(rddData) 

enter image description here

ОШИБКА enter image description here


Спасибо

Полная ошибка детализации

SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 2821, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:57)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2890)
    at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2881)
    at org.apache.spark.sql.Dataset$$anonfun$collectResult$1.apply(Dataset.scala:2880)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3492)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3487)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:113)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:242)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:99)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:172)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3487)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2880)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:149)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:54)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:984)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBufferInternal$1.apply(PythonDriverLocal.scala:931)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:931)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:492)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:918)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:364)
    at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl$6.apply(PythonDriverLocal.scala:351)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:876)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:351)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 508, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-1212426270091216>", line 6, in <lambda>
TypeError: an integer is required (got type str)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:676)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:659)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Ответы [ 2 ]

1 голос
/ 07 мая 2020

Наконец-то добился успеха с помощью следующего кода.

После 8 дней находок и помощи великого парня по переполнению стека ("{ ссылка }") вместе мог придумать следующий код который работал с Spark 2.4.5 в кирпичах данных.

Агрегирование и преобразование

from pyspark.sql.functions import *
dataF = data.select("OrderMonthYear", date_format("OrderMonthYear", 'yyyy-MM-dd').alias("dt_format"), "SaleAmount").groupBy("dt_format").sum().orderBy("dt_format").toDF("dt_format", "SaleAmount")

dataF.show()

results = tata.rdd.map(lambda r: (int(r.dt_format.replace('-','')), r.SaleAmount)) 
df = spark.createDataFrame(results,("dt_format", "SaleAmount")) 
display(df)


Преобразование кадра данных в элементы и метки

#convenience for specifying schema 

from pyspark.mllib.regression import LabeledPoint

meta = df.select("dt_format", "SaleAmount").toDF("dt_format","SaleAmount") meta.show()

rdd1 = meta.rdd.map(lambda r: LabeledPoint(r[1], [r[0]]))

rddtodf=spark.createDataFrame(rdd1,("dt_format","SaleAmount")) display(rddtodf)

Надеюсь, это поможет.

1 голос
/ 01 мая 2020

Я вижу, вы прокомментировали #.toDF("OrderMonthYear","SaleAmount") в первой строке вашего кода. Что в этом плохого?

Предполагая, что rddData - это RDD, который вы создаете после преобразований, вы можете использовать rddData.toDF(["a", "b", "c",....]). Чтобы это работало, у вас должны быть знания о столбцах, присутствующих в СДР.

Я хотел бы предложить вам прочитать ваши данные напрямую как DataFrame. Вам нужно будет внести изменения в ваш код, но оно того стоит.


РЕДАКТИРОВАТЬ 1: Глядя на комментарии о том, как вы читаете ваши данные, вы можете непосредственно прочитать файл CSV как DataFrame. Вы используете:

path = 'file:/databricks/driver/CogsleyServices-SalesData-US.csv'
data = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").option("sep", ",").load(path)

Вам не нужно использовать Spark's SQLContext для чтения CSV. Вы можете напрямую прочитать фрейм данных, используя SparkSession.

Я только что попробовал его на Spark / Python, и следующий код для чтения образца csv работал в блокноте Jupyter на моем локальном компьютере:

spark.read.format("csv").option("header", True).option("delimiter", ",").load(r"C:\Users\Spark\Downloads\boston_housing.csv")

Где spark - ваш SparkSession.

РЕДАКТИРОВАТЬ 2:

Теперь я понимаю, что вы пытаетесь сделать. Проверяя ваш полный журнал ошибок, мы видим TypeError: an integer is required (got type str). Проблема в lambda r: (int(r.OrderMonthYear.replace('-','')) части. Если вы выполните summary.show() в одиночку, вы поймете, что отметка времени содержит -, и :. Если вы не планируете использовать эту временную метку снова, я бы посоветовал вам преобразовать временную метку вашего фрейма данных в unix метку времени.

import pyspark.sql.functions as fn
from pyspark.sql.types import *

summary = data.select("OrderMonthYear", "SaleAmount").groupBy("OrderMonthYear").sum().orderBy("OrderMonthYear").withColumn("unix_timestamp", fn.unix_timestamp(fn.col('OrderMonthYear'), format='yyyy-MM-dd HH:mm:ss'))

Это сработало для меня.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...