Я хочу применить алгоритм случайного леса к фрейму данных, состоящему из трех столбцов, а именно JournalID, IndexedJournalID (получен с помощью StringIndexer of Spark) и вектора объектов.Я использовал приведенный ниже код для чтения кадра данных из файла паркета и применения String Indexer к столбцу JournalID для преобразования его в категориальный тип.
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import VectorUDT
df=spark.read.parquet('JouID-UBTFIDFVectors-server22.parquet')
labelIndexer = StringIndexer(inputCol="journalid", outputCol="IndexedJournalID")
labelsDF=labelIndexer.fit(df)
df1=labelsDF.transform(df)
# This function converts sparse vectors to dense vectors....I applied this on raw features column to convert them to VectorUDT type.....
parse_ = udf(lambda l: Vectors.dense(l), VectorUDT())
df2 = df1.withColumn("featuresNew", parse_(df1["features"])).drop('features')
Новая схема данных (df2) выглядит следующим образом:
root
|-- journalid: string (nullable = true)
|-- indexedLabel: double (nullable = false)
|-- featuresNew: vector (nullable = true)
Затем я делю df2 на обучающие и тестовые наборы и создаю объект случайного классификатора леса, как показано ниже:
(trainingData, testData) = df2.randomSplit([0.8, 0.2])
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="featuresNew", numTrees=2 )
Наконец, примените метод fit () к trainingData, полученному выше.
rfModel=rf.fit(trainingData)
Благодаря этому я могу обучать модель на 100 экземплярах входных данных.Однако, по всем данным обучения, эта строка дает следующую ошибку.
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 53652)
Traceback (most recent call last):
File "/data/sntps/code/conda3/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
self.process_request(request, client_address)
File "/data/sntps/code/conda3/lib/python3.6/socketserver.py", line 348, in process_request
self.finish_request(request, client_address)
File "/data/sntps/code/conda3/lib/python3.6/socketserver.py", line 361, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/data/sntps/code/conda3/lib/python3.6/socketserver.py", line 696, in __init__
self.handle()
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/accumulators.py", line 235, in handle
num_updates = read_int(self.rfile)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/serializers.py", line 685, in read_int
raise EOFError
EOFError
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41060)
Traceback (most recent call last):
File "/data/sntps/code/conda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2910, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-10-46d7488961c7>", line 1, in <module>
rfModel=rf.fit(trainingData)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/ml/base.py", line 132, in fit
return self._fit(dataset)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 288, in _fit
java_model = self._fit_java(dataset)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 285, in _fit_java
return self._java_obj.fit(dataset._jdf)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value
format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o90.fit
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data/sntps/code/conda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 1828, in showtraceback
stb = value._render_traceback_()
AttributeError: 'Py4JError' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/data/sp/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
.(traceback...not writing due to space issue)
.
.
Py4JError: An error occurred while calling o90.fit
Эта ошибка не очень наглядна, и поэтому мне стало трудно определить, где я иду не так.Любая помощь очень помогла бы.
Входное описание: Входной информационный кадр Содержит 2696512 строк, а вектор элементов каждой строки имеет длину 262144.