Фрейм данных Pyspark: доступ к столбцу - PullRequest
0 голосов
/ 10 апреля 2020

Я надеюсь, что каждый из вас в порядке, и Covid19 не сильно влияет на вашу жизнь.

Я борюсь с кодом PySpark, в частности, я хотел бы вызвать функцию для объекта col, что не повторяется.

from pyspark.sql.functions import col, lower, regexp_replace, split
from googletrans import Translator

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, r"^rt ", "")
  c = regexp_replace(c, r"(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "") #removePunctuation 
  c = regexp_replace(c, r"\n", " ")
  c = regexp_replace(c, r"   ", " ")
  c = regexp_replace(c, r"  ", " ")  
#   c = translator.translate(c, dest='en', src='auto')
  return c

clean_text_df = uncleanedText.select(clean_text(col("unCleanedCol")).alias("sentence"))
clean_text_df.printSchema()
clean_text_df.show(10)

Как только я запускаю код в c = translator.translate(c, dest='en', src='auto'), ошибка, отображаемая в Spark, равна TypeError: Column is not iterable.

Что я хотел бы сделать перевод за словом:

С:

+--------------------+
|            sentence|
+--------------------+
|ciao team there a...|
|dear itteam i urg...|
|buongiorno segnal...|
|hi team regarding...|
|hello please add ...|
|ciao vorrei effet...|
|buongiorno ho vis...|
+--------------------+

Кому:

+--------------------+
|            sentence|
+--------------------+
|hello team there ...|
|dear itteam i urg...|
|goodmorning segna...|
|hi team regarding...|
|hello please add ...|
|hello would effet...|
|goodmorning I see...|
+--------------------+

Схема DataFrame:

root
 |-- sentence: string (nullable = true)

Может ли кто-нибудь помочь мне?

Большое спасибо

1 Ответ

1 голос
/ 10 апреля 2020

PySpark - это просто Python API, написанный для поддержки Apache Spark. Если вы хотите использовать пользовательские функции python, вам нужно определить пользовательскую функцию (udf).

Сохраните вашу функцию clean_text() как есть (с помощью translate строка закомментирована) и попробуйте следующее:

from pyspark.sql.functions import udf
from pyspark.sql.Types import StringType

def translate(c):
  return translator.translate(c, dest='en', src='auto')

translateUDF = udf(translate, StringType())

clean_text_df = uncleanedText.select(
  translateUDF(clean_text(col("unCleanedCol"))).alias("sentence")
)

Другие функции в вашем оригинале clean_text (lower и regexp_replace) являются встроенными функциями искры и работают на pyspark.sql.Column.

Имейте в виду, что использование этого udf приведет к снижению производительности. См .: Функции Spark и производительность UDF?

...