Вычислить новый столбец в Spark Dataframe, пересекая столбец списка токенов в df1 с текстовым столбцом в df2 с помощью pyspark - PullRequest
2 голосов
/ 08 мая 2020

Я использую Spark 2.4.5, и мне нужно рассчитать оценку тональности из столбца списка токенов (MeaningfulWords столбец) из df1, согласно словам в df2 (spani sh словарь тональности) . В df1 я должен создать новый столбец со списком оценок токенов и еще один столбец со средним значением оценок (сумма оценок / количество слов) каждой записи. Если какой-либо токен в списке (df1) отсутствует в словаре (df2), засчитывается ноль.

Dataframes выглядит так:

df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
|                ID|               MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003|                 [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+

df2.show(5)
+-----+----------+
|score|      word|
+-----+----------+
| 1.68|abandonado|
| 3.18|    abejas|
|  2.8|    aborto|
| 2.46| abrasador|
| 8.13|    abrazo|
+-----+----------+

Новые столбцы для добавления в df1 должно выглядеть так:

+------------------+---------------------+
|         MeanScore|            ScoreList|
+------------------+---------------------+
|              2.95|[3.10, 2.50, 1.28,...|
|              2.15|[1.15, 3.50, 2.75,...|
|              2.75|[4.20, 1.00, 1.75,...|
|              3.25|[3.25, 2.50, 3.20,...|
|              3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+

Я просмотрел некоторые параметры, используя .join, но использование столбцов с разными типами данных дает ошибку. Я также попытался преобразовать Dataframes в RDD и вызвать функцию:

def map_words_to_values(review_words, dict_df):
return [dict_df[word] for word in review_words if word in dict_df]

RDD1=swRemoved.rdd.map(list) 
RDD2=Dict_df.rdd.map(list)

reviewsRDD_dict_values = RDD1.map(lambda tupple: (tupple[0], map_words_to_values(tupple[1], RDD2)))
reviewsRDD_dict_values.take(3)

Но с этой опцией я получаю сообщение об ошибке:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Я нашел несколько примеров для оценки текста с помощью afinn библиотека. Но это не работает с текстом spani sh.

Я хочу попытаться использовать собственные функции pyspark вместо использования udf, чтобы не повлиять на производительность, если это возможно. Но я новичок в искре, и я хотел бы найти способ сделать это искровым.

1 Ответ

2 голосов
/ 09 мая 2020

Вы можете сделать это, сначала присоединившись, используя array_contains word, затем groupBy с агрегатами first, collect_list и mean. (spark2.4+)

welcome to SO

df1.show()

#+------------------+----------------------------+
#|ID                |MeaningfulWords             |
#+------------------+----------------------------+
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|
#|abcde00000qMq00002|[clientes, contentos, servi]|
#|abcde00000qMQ00003|[resto, bien]               |
#+------------------+----------------------------+

df2.show()

#+-----+---------+
#|score|     word|
#+-----+---------+
#| 1.68|     casa|
#|  2.8|  alejado|
#| 1.03|     buen|
#| 3.68|    gusto|
#| 0.68| clientes|
#|  2.1|contentos|
#| 2.68|    servi|
#| 1.18|    resto|
#| 1.98|     bien|
#+-----+---------+


from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""),'left')\
   .groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
                      ,F.collect_list("score").alias("ScoreList")\
                      ,F.mean("score").alias("MeanScore"))\
                      .show(truncate=False)

#+------------------+----------------------------+-----------------------+------------------+
#|ID                |MeaningfullWords            |ScoreList              |MeanScore         |
#+------------------+----------------------------+-----------------------+------------------+
#|abcde00000qMQ00003|[resto, bien]               |[1.18, 1.98]           |1.58              |
#|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68]      |1.8200000000000003|
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975            |
#+------------------+----------------------------+-----------------------+------------------+
...