Используйте pyspark для запуска пользовательской функции из python - PullRequest
1 голос
/ 27 октября 2019

Я написал пользовательскую функцию, которая будет находить большинство вхождений слов из файла .txt. Мне нужно запустить его через PySpark как RDD

Я написал функцию с именем top_five, чей единственный параметр это имя_файла

import collections

def top_five(file_name):    

    file = open(file_name, 'r', encoding = 'utf8')

    list1 = []
    for line in file:
        print(line)
        words = line.split()
        for i in words:
            j =''.join(filter(str.isalpha, i))
            j = j.lower()
            if len(j) > 5:
                list1.append(j)            

    count = collections.Counter(list1)

    most_occur = count.most_common(5)

    print("The most used words in the Applied Data Science Textbook is:")
    for item in most_occur:
        print("\t" + item[0] + " occured " + str(item[1]) + " times")

    return

Фактические результаты должны быть последними 3 строками функции top_fiveгде его печатает каждое слово и количество вхождений

Ответы [ 2 ]

1 голос
/ 28 октября 2019

Где возможно, вы должны использовать API Spark целиком, а не пытаться обернуть существующую функцию, которая зависит от циклов и локального состояния, для конкретного исполнителя (от использования счетчика словаря)

file_name = 'README.md'

spark = SparkSession.builder\
  .master('local[*]')\
  .getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile(file_name)\
  .flatMap(lambda x: x.lower().split())\  # lowercase and split lines
  .map(lambda word: ''.join(filter(str.isalpha, word)))\  # remove non-alpha characters from words
  .filter(lambda word: len(word) > 5)\  # filter short words
  .map(lambda word: (word, 1))\  # count each words
  .reduceByKey(lambda a,b: a+b)\  # sum the counts by word
  .sortBy(lambda t: t[1], False)  # sort the words by descending counts

# Collect to a Python list
top_words = rdd.take(5)
for word_pair in top_words:
    print(f'"{word_pair[0]}" occurred {word_pair[1]} times')
0 голосов
/ 27 октября 2019

Неясно, какой из вышеперечисленных объектов связан с фреймом данных PySpark, который в идеале вы бы здесь увеличивали.

Преобразуйте свою функцию в PySpark UDF , который позволит вамПримените свою логику к кадру данных PySpark без необходимости преобразования в Pandas и обратно.

Для очень больших кадров данных общеизвестно, что пользовательские функции могут работать очень плохо / работать долго. Это тоже мой личный опыт.

...