У меня есть две тетради. В моей первой записной книжке я создал Spark DF, содержащий прогнозы:
predictions.show(3)
+--------------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[-0.6538794393542...| 1|[-0.5417241375347...|[0.36778659515978...| 1.0|
|[-0.6538794393542...| 1|[0.51214847497691...|[0.62530999194820...| 0.0|
|[1.44789077022497...| 1|[0.23167647552561...|[0.55766143934795...| 0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
predictions.printSchema()
root
|-- features: vector (nullable = true)
|-- label: long (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
У меня не было выбора, кроме как преобразовать это в Pandas DF и затем записать в csv. Я знаю, что мог написать напрямую в csv с помощью spark, но мне не разрешено это делать ...
predictions.toPandas().to_csv('data/baseline/predictions/predictions_1.csv', index=False)
Поэтому во втором блокноте я хотел бы загрузить прогнозы и вернуться к исходной схеме. Я попытался скомбинировать ArrayType и другие типы данных в моей схеме без успеха ...
from pyspark.sql.types import StructType, StructField, DoubleType, LongType, ArrayType, StringType
schema = StructType([StructField('features', ArrayType(StringType()), True),
StructField('label', LongType(), True),
StructField('rawPrediction', ArrayType(StringType()), True),
StructField('probability', ArrayType(StringType()), True),
StructField('prediction', ArrayType(StringType()), True)])
df = spark.read.option('inferschema','false').csv('data/baseline/predictions/predictions_1.csv', header=True, schema=schema)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-17-d716574c302b> in <module>()
----> 1 df = spark.read.option('inferschema','false').csv('data/baseline/predictions/predictions_1.csv', header=True, schema=schema)
/etc/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
474 path = [path]
475 if type(path) == list:
--> 476 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
477 elif isinstance(path, RDD):
478 def func(iterator):
/home/cdsw/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/etc/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'CSV data source does not support array<string> data type.;'
EDIT
В качестве альтернативы я также пытался загрузить как pandas df сначала, а затем преобразовать в Spark DF:
baseline_predictions_1 = pd.read_csv('data/baseline/predictions/predictions_1.csv')
schema = StructType([StructField('features', VectorUDT(), True),
StructField('label', LongType(), True),
StructField('rawPrediction', VectorUDT(), True),
StructField('probability', VectorUDT(), True),
StructField('prediction', DoubleType(), True)])
baseline_predictions_1_spark = spark.createDataFrame(baseline_predictions_1, schema=schema)
с ошибкой:
ValueError: field features: '[-0.653879439354218,-0.641657985767437,2.1745465575819853,-0.3942437541747706,-0.27836086647393893,-0.15247992273301034,-0.8731383370228931,1.7558595869135778,-0.07872999791923047,-0.8600514403077006,-0.08761812501346317,0.728030971700451,-0.08961758270441313,-0.14051816811964882,-0.3783655351155729,-0.020764023664758085,-1.0881108113513056,0.6862718942720165,0.6879649578230858,-0.6474546016400257,0.8207328275149384,-1.4819827281817985,0.6862718942720165,0.37756974755568345,-0.27456813413128256,0.12233574531606022,1.1951742541283155,-0.41441333187999324,1.0156242745826165,0.11511988704597224,1.5202964810591166,1.2872256417423313,-1.043892338028877,0.3234319111586267,0.17212871533095103,0.06610112328274363,-0.19664008337776648,-0.06452535228414881,-0.18341834319735326,-0.6261778019519352,-0.2026155280242467,0.6726870749554883,0.0,-0.36270454523619444,-0.3280964915007003,0.49637854701175443,-1.2782623750282807,-0.1577076671186269,0.8069610213030648,1.1814728988049719,-0.37738387311351584,-0.545476207415827,4.192585454890639,-1.0251108125906505,2.721589088276589,0.0]' is not an instance of type VectorUDT