У меня проблема с производительностью при работе с NLP в Pyspark, в Databricks:
CONTEXT:
У меня есть 2 фрейма данных pyspark со столбцом «ID» и столбцом «текст», например:
Table A | Table B
ID_A TEXT_A | ID_B TEXT_B
0 text_A0 | 0 text_B0
1 text_A1 | 1 text_B1
2 text_A2 | 2 text_B2
Чтобы найти сходство между текстами, я хочу измерить косинусное сходство между каждой записью A и каждой записью B (что-то вроде подобия декартового произведения), поэтому я использую модель Word2vec
.
Первым шагом является обучение модели Word2Vec
из ml
lib, поясненной ниже:
word2vec = Word2Vec(vectorSize = 5, windowSize=2, inputCol =
'TEXT', outputCol = 'COORDINATES')
model_Word2Vec = word2vec.fit(Data_train)
Затем я применяю модель к данным A и данным B (model_Word2vec.transform(A)
и model_Word2vec.transform(B)
), получая координаты каждого текста (среднее значение слов по координатам).Например:
display(model_Word2vec.transform(A))
ID_A TEXT_A COORDINATES_A
0 text A0 [1, 5, [], [0.05, 0.1, -1.5, 0.2, -0.7]]
1 text A1 [1, 5, [], [0.15, -1.1, 0.5, 0.27, -0.1]]
2 text A2 [1, 5, [], [1.05, 1.2, -0.55, 0.2, -1.7]]
Я должен сказать, что кадры данных распределены и неизменны.
Пример сходства косинусов в pyspark:
X = [0.05, 0.1, -1.5, 0.2, -0.7]; Y = [1.0, 0.003, 2.12, 0.22, 1.3]
cos(X, Y) = X.dot(Y) / ( X.norm(2)*Y.norm(2) )
ПРОБЛЕМА:
Я хочу что-то вроде этого:
Crossjoin
ID_A COORDINATES_A ID_B COORDINATES_B Cosine
0 [0.05, 0.1, -1.5, 0.2, -0.7] 0 [1.0, 0.003, 2.12, 0.22, 1.3] -0.89
0 [0.05, 0.1, -1.5, 0.2, -0.7] 1 [0.13, 1.1, 0.5,1.27, 1.99] -0.4
1 [0.15, -1.1, 0.5, 0.27, -0.1] 0 [1.0, 0.003, 2.12, 0.22, 1.3] -0.34
1 [0.15, -1.1, 0.5, 0.27, -0.1] 1 [0.13, 1.1, 0.5,1.27, 1.99] -0.24
2 [1.05, 1.2, -0.55, 0.2, -1.7] 0 [1.0, 0.003, 2.12, 0.22, 1.3] -0.35
2 [1.05, 1.2, -0.55, 0.2, -1.7] 1 [0.13, 1.1, 0.5,1.27, 1.99] -0.31
У меня проблемы с производительностью, и я получаю что-то вроде этого:
1 ° Первый подход:
# Then we make a loop:
for i in range(0, 3):
# Here we get the coordinates of the row “i” of the data A, in a list type.
Y = Vectors.dense( A.select( col( COORDINATES_A ) ).collect()[i][0]
# Here we go through every row of the coordinates of the data B, get the coordinates (with x[0]), and compute 1- cosine similarity (that we will name “coseno”) between every coordinates of B, and the coordinate “i” of A.
Cosino_list = [[float(i)] for i in B[ ['COORDINATES_B']].rdd.map(lambda x: 1-x[0].dot(Y)/(x[0].norm(2)*Y.norm(2))).collect()]
# Here we create a data frame with a column named “coseno” and the values of Cosino_list, and concatenate the rows with the previous data:
df_cosino = df_cosino.union(spark.createDataFrame(Cosino_list, ["coseno"]))
Я думаю, что 2 цикла вызывают следующую ошибку:
(15) Spark Jobs
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.
2 ° Второй подход:
# We make a crossjoin between A and B:
Df_crossjoin = A.crossJoin(B).
# We extract the coordinates:
column_COORDINATES_A = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_A")).collect()]
column_COORDINATES_B = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_B")).collect()]
Но я получаюследующая ошибка
(1) Spark Jobs
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 54, 10.139.64.6, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last) <command-2001347748501300> in <module>() ----> 1 columna_que_como_c = [(i[0].values) for i in cross_que.select(col("result_que_como_c")).collect()] /databricks/spark/python/pyspark/sql/dataframe.py in collect(self) 546 # Default path used in OSS Spark / for non-DF-ACL clusters: 547 with SCCallSiteSync(self._sc) as css: --> 548 sock_info = self._jdf.collectToPython() 549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) 550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw):´
Буду очень признателен, если кто-нибудь поможет мне решить проблему.