Проблема производительности Pyspark (Databricks). НЛП проблема - PullRequest
1 голос
/ 10 марта 2019

У меня проблема с производительностью при работе с NLP в Pyspark, в Databricks:

CONTEXT:
У меня есть 2 фрейма данных pyspark со столбцом «ID» и столбцом «текст», например:

Table A              |   Table B
ID_A  TEXT_A         |   ID_B    TEXT_B
0     text_A0        |   0       text_B0
1     text_A1        |   1       text_B1
2     text_A2        |   2       text_B2

Чтобы найти сходство между текстами, я хочу измерить косинусное сходство между каждой записью A и каждой записью B (что-то вроде подобия декартового произведения), поэтому я использую модель Word2vec.
Первым шагом является обучение модели Word2Vec из ml lib, поясненной ниже:

word2vec = Word2Vec(vectorSize = 5, windowSize=2, inputCol = 
'TEXT', outputCol = 'COORDINATES')
model_Word2Vec = word2vec.fit(Data_train)

Затем я применяю модель к данным A и данным B (model_Word2vec.transform(A) и model_Word2vec.transform(B)), получая координаты каждого текста (среднее значение слов по координатам).Например:

display(model_Word2vec.transform(A))

ID_A    TEXT_A    COORDINATES_A     
0       text A0   [1, 5, [], [0.05, 0.1, -1.5, 0.2, -0.7]]      
1       text A1   [1, 5, [], [0.15, -1.1, 0.5, 0.27, -0.1]]     
2       text A2   [1, 5, [], [1.05, 1.2, -0.55, 0.2, -1.7]]             

Я должен сказать, что кадры данных распределены и неизменны.

Пример сходства косинусов в pyspark:

X = [0.05, 0.1, -1.5, 0.2, -0.7]; Y = [1.0, 0.003, 2.12, 0.22, 1.3] 
cos(X, Y) = X.dot(Y) / ( X.norm(2)*Y.norm(2) ) 

ПРОБЛЕМА:
Я хочу что-то вроде этого:

Crossjoin
ID_A  COORDINATES_A                 ID_B COORDINATES_B                 Cosine
0     [0.05, 0.1, -1.5, 0.2, -0.7]  0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.89
0     [0.05, 0.1, -1.5, 0.2, -0.7]  1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.4
1     [0.15, -1.1, 0.5, 0.27, -0.1] 0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.34
1     [0.15, -1.1, 0.5, 0.27, -0.1] 1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.24
2     [1.05, 1.2, -0.55, 0.2, -1.7] 0    [1.0, 0.003, 2.12, 0.22, 1.3]  -0.35
2     [1.05, 1.2, -0.55, 0.2, -1.7] 1    [0.13, 1.1, 0.5,1.27, 1.99]    -0.31

У меня проблемы с производительностью, и я получаю что-то вроде этого:

1 ° Первый подход:

# Then we make a loop:
for i in range(0, 3):
  # Here we get the coordinates of the row “i” of the data A, in a list type.
  Y = Vectors.dense( A.select( col( COORDINATES_A ) ).collect()[i][0]

  # Here we go through every row of the coordinates of the data B, get the coordinates (with x[0]), and compute 1- cosine similarity (that we will name “coseno”) between every coordinates of B, and the coordinate “i” of A.
  Cosino_list = [[float(i)] for i in B[ ['COORDINATES_B']].rdd.map(lambda x: 1-x[0].dot(Y)/(x[0].norm(2)*Y.norm(2))).collect()]

  # Here we create a data frame with a column named “coseno” and the values of Cosino_list, and concatenate the rows with the previous data:
  df_cosino = df_cosino.union(spark.createDataFrame(Cosino_list, ["coseno"]))

Я думаю, что 2 цикла вызывают следующую ошибку:

(15) Spark Jobs
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

2 ° Второй подход:

# We make a crossjoin between A and B:
Df_crossjoin = A.crossJoin(B).

# We extract the coordinates:
column_COORDINATES_A = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_A")).collect()]
column_COORDINATES_B = [(i[0].values) for i in Df_crossjoin.select(col("COORDINATES_B")).collect()]

Но я получаюследующая ошибка

(1) Spark Jobs
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 54, 10.139.64.6, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
--------------------------------------------------------------------------- 
Py4JJavaError Traceback (most recent call last) <command-2001347748501300> in <module>() ----> 1 columna_que_como_c = [(i[0].values) for i in cross_que.select(col("result_que_como_c")).collect()] /databricks/spark/python/pyspark/sql/dataframe.py in collect(self) 546 # Default path used in OSS Spark / for non-DF-ACL clusters: 547 with SCCallSiteSync(self._sc) as css: --> 548 sock_info = self._jdf.collectToPython() 549 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer()))) 550 /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw):´

Буду очень признателен, если кто-нибудь поможет мне решить проблему.

...