Pyspark - saveAsTable выдает ошибку индекса, в то время как dataframe show () работает отлично - PullRequest
0 голосов
/ 19 мая 2018

Попытка сохранить фрейм данных в виде таблицы.

Я также могу создать фрейм данных и временную таблицу.Но при сохранении того же информационного кадра с помощью saveAsTable () выдается ошибка индекса.

Я проверил схему Dataframe, похоже, все в порядке.

Не уверен, в чем проблема, и не смог получить из журнала ничего, кроме ошибки индекса.

>

>>> sqlContext.sql('select * from bx_users limit 2').show()
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+

>>> bx_users_df.show(2)
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+
only showing top 2 rows

>>> bx_users_df.printSchema()
root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)

>>> bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')
18/05/19 00:12:36 ERROR util.Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<stdin>", line 1, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
18/05/19 00:12:37 ERROR datasources.DefaultWriterContainer: Task attempt attempt_201805190012_0222_m_000000_0 aborted.
18/05/19 00:12:37 ERROR executor.Executor: Exception in task 0.0 in stage 222.0 (TID 245)
org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

1 Ответ

0 голосов
/ 19 мая 2018

Одним из самых больших недостатков Spark является то, что операции ленивы, и даже если вызвать действие, Spark постарается выполнить как можно меньше работы.Например,

show попытается оценить только 20 первых строк - если в конвейере нет широких преобразований, он не будет обрабатывать все данные.Вот почему show может работать, в то время как saveAsTable терпит неудачу.

Ваш код ошибки в лямбда-выражении:

File "<stdin>", line 1, in <lambda>

в результате:

IndexError: list index out of range

Это почти всегда ошибка пользователя в отсутствии обработки искаженных данных.Я подозреваю, что ваш код содержит что-то похожее на

(sc.textFile(...)
    .map(lambda line: line.split(...)
    .map(lambda xs: (xs[0], xs[1], xs[3]))) 

, и ваш код завершается ошибкой, когда строка не содержит ожидаемое количество аргументов.

В целом предпочитают стандартные функции, которые обрабатывают возможные исключения, или используют другие методы , чтобы избежать сбоев.

И если это просто анализ файла данных с разделителями (CSV, TSV), используйте Spark CSV reader .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...