Pyspark конвертирует PipelinesRDD в Spark DataFrame - PullRequest
0 голосов
/ 07 августа 2020

Я использую Spark 2.3.1 и выполняю NLP в искре, когда печатаю тип RDD, который он показывает и при выполнении команды

rdd.collect()

на PipelineRDD выводится

['вариант настоящего изобретения включает в себя объединение двух беспроводное устройство, устанавливающее по меньшей мере один режим формирования пары с двумя устройствами, выполняющее по меньшей мере одно событие движения в пару, по меньшей мере одно беспроводное устройство удовлетворяет по меньшей мере одно условие образования пары, обнаруживающее удовлетворение, по меньшей мере, одно условие образования пары, отвечающее за создание пары двух беспроводных устройств, ответ обнаруживает удовлетворение, по меньшей мере, одно условие образования пары, предоставленный многочисленный аспект ',' настоящее изобретение относится Система беспроводной связи, в частности, настоящее изобретение относится к способу передачи управляющей информации pucch с устройством системы беспроводной связи, содержащим этап получения множества второго потока символов модуляции, соответствующего множеству символов мультиплексирования с частотным разделением scfdma с одной несущей, распространение множества первых потоков символов модуляции, формирование первого потока символов модуляции, соответствующего символу scfdma в первом слот получения множества поток мультиплексных символов, выполняющий процесс предварительного кодирования дискретного преобразования Фурье dft, поток множества вторых символов модуляции, передающий множество потоков комплексных символов pucch, в котором поток множества вторых символов модуляции скремблирован на уровне символов scfdma dog church aardwolf abacus ']

Я хочу создать такой фрейм данных, чтобы добавить каждое слово в строки фрейма данных.

+--------------+
|    text      |
+--------------+
|  embodiment  |
|  present     |
|  invention   |
....
....
|  aardwolf    |
|  abacus      |
+--------------+

Вот мой код

import pyspark
import nltk
import string


from pyspark import SparkContext
from nltk.stem import WordNetLemmatizer

from pyspark.ml.feature import NGram
from pyspark.sql.types import ArrayType,StructType,StructField,StringType

from pyspark.sql import SparkSession


sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('Spark Example').getOrCreate()

Source_path="Folder_of_multiple_text_file"


data=sc.textFile(Source_path)

lower_casetext = data.map(lambda x:x.lower())



# splitting_rdd = lower_casetext.map(lambda x:x.split(" "))
# print(splitting_rdd.collect())


# Function to perform Sentence tokeniaztion
def sent_TokenizeFunct(x):
    return nltk.sent_tokenize(x)

sentencetokenization_rdd = lower_casetext.map(sent_TokenizeFunct)

# Function to perform Word tokenization

def word_TokenizeFunct(x):
    splitted = [word for line in x for word in line.split()]
    return splitted

wordtokenization_rdd = sentencetokenization_rdd.map(word_TokenizeFunct)


# Remove Stop Words

def removeStopWordsFunct(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))
    filteredSentence = [w for w in x if not w in stop_words]
    return filteredSentence
stopwordRDD = wordtokenization_rdd.map(removeStopWordsFunct)


# Remove Punctuation marks

def removePunctuationsFunct(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered
rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)

# Perform Lemmatization

def lemma(x):

    lemmatizer = WordNetLemmatizer()

    final_rdd = [lemmatizer.lemmatize(s) for s in x]
    return final_rdd

lem_wordsRDD = rmvPunctRDD.map(lemma)

# Join tokens

def joinTokensFunct(x):
    joinedTokens_list = []
    x = " ".join(x)
    return x

joinedTokensRDD = lem_wordsRDD.map(joinTokensFunct)


print(joinedTokensRDD.collect())
print(type(joinedTokensRDD))

Ответы [ 2 ]

0 голосов
/ 07 августа 2020

Я изменил свой код, удалив шаг токенов соединения и напрямую преобразовав lem_wordsRDD во фрейм данных с помощью следующего кода.

df = lem_wordsRDD.map(lambda x: (x, )).toDF(["features"])

explod_df = df.withColumn("values", explode("features"))

tokenized_df = explod_df.select("values")
tokenized_df.show()
0 голосов
/ 07 августа 2020

Примерно так, но адаптируем соответственно:

data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

rdd = spark.sparkContext.parallelize(data)
rdd.collect

# generate a pipelined RDD with some dummy logic
rdd = rdd.filter(lambda x: x[2] == x[2])

from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()
...