Прогнозы загрузки проблемы схемы PySpark - PullRequest
0 голосов
/ 16 марта 2020

У меня есть две тетради. В моей первой записной книжке я создал Spark DF, содержащий прогнозы:

predictions.show(3)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[-0.6538794393542...|    1|[-0.5417241375347...|[0.36778659515978...|       1.0|
|[-0.6538794393542...|    1|[0.51214847497691...|[0.62530999194820...|       0.0|
|[1.44789077022497...|    1|[0.23167647552561...|[0.55766143934795...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows

predictions.printSchema() 

root
 |-- features: vector (nullable = true)
 |-- label: long (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

У меня не было выбора, кроме как преобразовать это в Pandas DF и затем записать в csv. Я знаю, что мог написать напрямую в csv с помощью spark, но мне не разрешено это делать ...

predictions.toPandas().to_csv('data/baseline/predictions/predictions_1.csv', index=False)

Поэтому во втором блокноте я хотел бы загрузить прогнозы и вернуться к исходной схеме. Я попытался скомбинировать ArrayType и другие типы данных в моей схеме без успеха ...

from pyspark.sql.types import StructType, StructField, DoubleType, LongType, ArrayType, StringType

schema = StructType([StructField('features', ArrayType(StringType()), True),
                     StructField('label', LongType(), True),
                     StructField('rawPrediction', ArrayType(StringType()), True),
                     StructField('probability', ArrayType(StringType()), True), 
                     StructField('prediction', ArrayType(StringType()), True)])

df = spark.read.option('inferschema','false').csv('data/baseline/predictions/predictions_1.csv', header=True, schema=schema)

During handling of the above exception, another exception occurred:    
AnalysisException                         Traceback (most recent call last)
<ipython-input-17-d716574c302b> in <module>()
----> 1 df = spark.read.option('inferschema','false').csv('data/baseline/predictions/predictions_1.csv', header=True, schema=schema)

/etc/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
    474             path = [path]
    475         if type(path) == list:
--> 476             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    477         elif isinstance(path, RDD):
    478             def func(iterator):

/home/cdsw/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/etc/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'CSV data source does not support array<string> data type.;'

EDIT

В качестве альтернативы я также пытался загрузить как pandas df сначала, а затем преобразовать в Spark DF:

baseline_predictions_1 = pd.read_csv('data/baseline/predictions/predictions_1.csv')

schema = StructType([StructField('features', VectorUDT(), True),
                     StructField('label', LongType(), True),
                     StructField('rawPrediction', VectorUDT(), True),
                     StructField('probability', VectorUDT(), True), 
                     StructField('prediction', DoubleType(), True)])

baseline_predictions_1_spark = spark.createDataFrame(baseline_predictions_1, schema=schema)

с ошибкой:

ValueError: field features: '[-0.653879439354218,-0.641657985767437,2.1745465575819853,-0.3942437541747706,-0.27836086647393893,-0.15247992273301034,-0.8731383370228931,1.7558595869135778,-0.07872999791923047,-0.8600514403077006,-0.08761812501346317,0.728030971700451,-0.08961758270441313,-0.14051816811964882,-0.3783655351155729,-0.020764023664758085,-1.0881108113513056,0.6862718942720165,0.6879649578230858,-0.6474546016400257,0.8207328275149384,-1.4819827281817985,0.6862718942720165,0.37756974755568345,-0.27456813413128256,0.12233574531606022,1.1951742541283155,-0.41441333187999324,1.0156242745826165,0.11511988704597224,1.5202964810591166,1.2872256417423313,-1.043892338028877,0.3234319111586267,0.17212871533095103,0.06610112328274363,-0.19664008337776648,-0.06452535228414881,-0.18341834319735326,-0.6261778019519352,-0.2026155280242467,0.6726870749554883,0.0,-0.36270454523619444,-0.3280964915007003,0.49637854701175443,-1.2782623750282807,-0.1577076671186269,0.8069610213030648,1.1814728988049719,-0.37738387311351584,-0.545476207415827,4.192585454890639,-1.0251108125906505,2.721589088276589,0.0]' is not an instance of type VectorUDT
...