Я пытаюсь преобразовать два пустых вектора (вывод из pyspark.ml PCA) в PySpark DataFrame, а затем записать этот DataFrame в мою среду Hive, но кажется, что созданные мной DataFrames нарушаются фундаментальным образом, что Я не понимаю.
Ниже приведен игрушечный пример для генерации ошибки; рабочая версия этого примера преуспевает в моей среде Jupyter Notebook (PySpark 2.1), но не работает при запуске через командную строку (PySpark 2.2) в производственном кластере.
Я не смог найти ничего в документации по обновлению с 2.1 до 2.2, чтобы предположить, почему эта проблема может существовать.
import numpy as np
import pandas as pd
spark = SparkSession.builder.getOrCreate()
A = np.array(range(10))
B = np.array(list("ABCDEFGHIJ"))
pdDF = pd.DataFrame(B, columns=(["B"]), index=A)
sDF = spark.createDataFrame(pdDF)
Пока все хорошо. Изучение прекурсоров для sDF:
>>> A
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> B
array(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'],
dtype='<U1')
>>> pdDF
B
0 A
1 B
2 C
3 D
4 E
5 F
6 G
7 H
8 I
9 J
Схема sDF выглядит хорошо, я думаю.
>>> sDF.schema
StructType(List(StructField(B,StringType,true)))
Но попытка «взять» две строки приводит к ~ 100 строкам трассировки ошибок, которые я не понимаю:
>>> sDF.take(2)
19/05/26 22:45:28 ERROR scheduler.TaskSetManager: Task 0 in stage 104.0 failed 4 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/sql/dataframe.py", line 476, in take
return self.limit(num).collect()
File "/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/sql/dataframe.py", line 438, in collect
sock_info = self._jdf.collectToPython()
File "/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/data/2/parcels/SPARK2-2.2.0.cloudera4-1.cdh5.13.3.p0.603055/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1568.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 104.0 failed 4 times, most recent failure: Lost task 0.3 in stage 104.0 (TID 4267, anp-r01wn07.c03.hadoop.td.com, executor 74): java.io.IOException: Cannot run program "/usr/local/anaconda3/bin/python": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:169)
at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:95)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:69)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:132)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 29 more
Естественно, я надеюсь увидеть верхние строки данных. Это сообщение об ошибке такое же, как при попытке записать таблицу в Hive.