Сохранение разреженных векторов PySpark / Spark ML в libsvm или разреженном формате - PullRequest
0 голосов
/ 27 августа 2018

Я использовал Spark ML для преобразований в моем большом наборе данных и хотел бы экспортировать результирующие столбцы функций SparseVectors и меток как libSVM (или какой-либо иной разреженный формат).

Немного предыстории: я использую машину с оперативной памятью 450 Гб, работающую под управлением PySpark, и я использовал FeatureHasher с numFeatures = 2 ** 26 (довольно жесткое требование). Кроме того, у меня возникали проблемы с исключениями OutOfMemory и NegativeArraySizeExceptions при попытке записать DataFrame как паркет.

Поскольку формат libSVM является редким форматом и может быть легко прочитан другими библиотеками Python, я изучаю возможности написания как libSVM.

Я обратил внимание, что в Spark MLlib есть функция для записи данных в формате libSVM, но есть ли подобная функциональность в Spark ML?

Я пытался

from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors 
from pyspark.mllib.regression import LabeledPoint

d = df_final.select('label','final_features').rdd.map(lambda x : LabeledPoint(x[0],MLLibVectors.fromML(x[1])))

без удачи. (не хватает памяти)

Py4JJavaError: An error occurred while calling o1389.javaToPython.
: java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:3186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
...