Как вернуть словарь из пользовательской функции во фрейм данных pyspark? - PullRequest
0 голосов
/ 04 марта 2019

Я определил следующую функцию:

def test_function(string):
    import nltk
    from nltk.sentiment.vader import SentimentIntensityAnalyzer

    lower_string = string.lower()
    sid = SentimentIntensityAnalyzer()

    res_dict = sid.polarity_scores(lower_string)
    return res_dict

Я преобразовал функцию в udf, чтобы передать ее во фрейм данных Pyspark, выполнив следующее:

udf_test_function = udf(lambda z: test_function(z), MapType(StringType(), DoubleType()))

У меня есть фрейм данных pyspark, комментарии, в котором есть только столбец, комментарий, содержащий строки.

+--------------------+
|             comment|
+--------------------+
|                 nan|
|                 nan|
|                 nan|
|So far it has per...|
|I purchased it fo...|
+--------------------+
only showing top 5 rows

И у него есть схема:

root
 |-- comment: string (nullable = true)

Я передаю udf вфрейм данных следующим образом:

test_result = comments.select('comment',udf_test_function('comment').alias('Result'))

Результатом этой операции должен быть новый фрейм данных test_result, который действительно имеет тип pyspark.sql.dataframe.DataFrame и должен иметь два столбца comment и Result.Но когда я пытаюсь показать результаты test_result.show(5), я получаю следующую ошибку:

Py4JJavaError: An error occurred while calling o161.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 10, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

Это происходит только тогда, когда я использую функции NLTK, как будто я передаю другие функции в фрейм данных, он работает правильно.

Я использую ноутбук Jupyter 4.4.0 с spark 2.4.0 на Python 3.7.2 (все установлено на моем ПК локально);У меня сложилось впечатление, что это скорее проблема конфигурации, чем логическая проблема.

Но любая помощь будет принята с благодарностью, поскольку я новичок в настройке Spark / Pyspark.

1 Ответ

0 голосов
/ 08 марта 2019

Мне удалось решить проблему, это была проблема конфигурации, поскольку я не отправлял библиотеку NLTK на рабочие узлы моей установки Spark.Я следовал этому руководству и смог решить свою проблему: NLTK в учебнике Spark

Я также исправил функцию, чтобы лучше обрабатывать ошибки:

def vader_SID(input_string='Error'):
    import nltk
    from nltk.sentiment.vader import SentimentIntensityAnalyzer

    sid = SentimentIntensityAnalyzer()

    try:
        lower_string = input_string.lower()
        res_dict = sid.polarity_scores(lower_string)  
        return res_dict

    except ValueError:
        print('Value Error!')

    except AttributeError:
        print('Atribute Error!')

    except TypeError:
        print('Type Error!')

Полученные данныекадр из функции:

+--------------------+--------------------+
|             comment|              Result|
+--------------------+--------------------+
|                 nan|[neg -> 0.0, pos ...|
|                 nan|[neg -> 0.0, pos ...|
|                 nan|[neg -> 0.0, pos ...|
|So far it has per...|[neg -> 0.0, pos ...|
|I purchased it fo...|[neg -> 0.0, pos ...|
+--------------------+--------------------+
only showing top 5 rows
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...