Я только начал изучать spark и пытался провести анализ текста с использованием spark / nltk и python в среде Windows, прикреплен мой код и полное сообщение об ошибке, пожалуйста, посмотрите на него и помогите мне
У меня есть набор обзоров в tsv файлах, которые я загрузил в hdfs, а затем распечатал комментарии, и все работает нормально, печать работает только при попытке загрузить их в график, это выдает ошибку
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, lit
from functools import reduce
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import pandas as pd
import re
import string
spark = SparkSession.builder.appName('AmazonAlexaUserReview').master('local').getOrCreate()
schema = StructType([
StructField("rating", IntegerType(), True),
StructField("date", StringType(), True),
StructField("variation", StringType(), True),
StructField("verified_reviews", StringType(), True),
StructField("feedback", IntegerType(), True)])
data_DF = spark.read.format("org.apache.spark.csv").option("delimiter","\t").schema(schema).option("mode", "PERMISSIVE").option("inferSchema", "True").csv('amazon_alexa.tsv')
data_DF.createOrReplaceTempView("data")
reviews_rdd = data_DF.select("verified_reviews").rdd.flatMap(lambda x: x)
# print(reviews_rdd.collect())
header = reviews_rdd.first()
data_rmv_col = reviews_rdd.filter(lambda row: row != header)
lowerCase_sentRDD = data_rmv_col.map(lambda x : x.lower())
def sent_TokenizeFunct(x):
return nltk.sent_tokenize(x)
sentenceTokenizeRDD = lowerCase_sentRDD.map(sent_TokenizeFunct)
def word_TokenizeFunct(x):
splitted = [word for line in x for word in line.split()]
return splitted
wordTokenizeRDD = sentenceTokenizeRDD.map(word_TokenizeFunct)
def removeStopWordsFunct(x):
from nltk.corpus import stopwords
stop_words=set(stopwords.words('english'))
filteredSentence = [w for w in x if not w in stop_words]
return filteredSentence
stopwordRDD = wordTokenizeRDD.map(removeStopWordsFunct)
def removePunctuationsFunct(x):
list_punct=list(string.punctuation)
filtered = [''.join(c for c in s if c not in list_punct) for s in x]
filtered_space = [s for s in filtered if s] #remove empty space
return filtered
rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)
def lemmatizationFunct(x):
nltk.download('wordnet')
lemmatizer = WordNetLemmatizer()
finalLem = [lemmatizer.lemmatize(s) for s in x]
return finalLem
lem_wordsRDD = rmvPunctRDD.map(lemmatizationFunct)
def joinTokensFunct(x):
joinedTokens_list = []
x = " ".join(x)
return x
joinedTokens = lem_wordsRDD.map(joinTokensFunct)
def extractPhraseFunct(x):
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
def leaves(tree):
"""Finds NP (nounphrase) leaf nodes of a chunk tree."""
for subtree in tree.subtrees(filter=lambda t: t.label() == 'NP'):
yield subtree.leaves()
def get_terms(tree):
for leaf in leaves(tree):
term = [w for w, t in leaf if not w in stop_words]
yield term
sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
grammar = r"""
NBAR:
{<NN.*|JJ>*<NN.*>} # Nouns and Adjectives, terminated with Nouns
NP:
{<NBAR>}
{<NBAR><IN><NBAR>} # Above, connected with in/of/etc...
"""
chunker = nltk.RegexpParser(grammar)
tokens = nltk.regexp_tokenize(x, sentence_re)
postoks = nltk.tag.pos_tag(tokens) # Part of speech tagging
tree = chunker.parse(postoks) # chunking
terms = get_terms(tree)
temp_phrases = []
for term in terms:
if len(term):
temp_phrases.append(' '.join(term))
finalPhrase = [w for w in temp_phrases if w] # remove empty lists
return finalPhrase
extractphraseRDD = joinedTokens.map(extractPhraseFunct)
def sentimentWordsFunct(x):
from nltk.sentiment.vader import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
senti_list_temp = []
for i in x:
y = ''.join(i)
vs = analyzer.polarity_scores(y)
senti_list_temp.append((y, vs))
senti_list_temp = [w for w in senti_list_temp if w]
sentiment_list = []
for j in senti_list_temp:
first = j[0]
second = j[1]
for (k, v) in second.items():
if k == 'compound':
if v < 0.0:
sentiment_list.append((first, "Negative"))
elif v == 0.0:
sentiment_list.append((first, "Neutral"))
else:
sentiment_list.append((first, "Positive"))
return sentiment_list
sentimentRDD = extractphraseRDD.map(sentimentWordsFunct)
freqDistRDD = extractphraseRDD.flatMap(lambda x : nltk.FreqDist(x).most_common()).map(lambda x: x).reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending = False)
df_fDist = freqDistRDD.toDF() #converting RDD to spark dataframe
df_fDist.createOrReplaceTempView("myTable")
df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20") #renaming columns
pandD = df2.toPandas() #converting spark dataframes to pandas dataframes
pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))
wordcloudConvertDF = pandD.set_index('Keywords').T.to_dict('records')
wordcloud = WordCloud(width=800, height=500, random_state=21, max_font_size=100, relative_scaling=0.5, colormap='Dark2').generate_from_frequencies(dict(*wordcloudConvertDF))
plt.figure(figsize=(14, 10))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.show()
Я получаю полное сообщение об ошибке:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 1) / 1]19/05/07 14:25:27 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
19/05/07 14:25:27 ERROR PythonRunner: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
19/05/07 14:25:27 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
19/05/07 14:25:28 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
19/05/07 14:25:28 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
File "D:/SearchForBussiness/MonDemo.py", line 28, in <module>
header = reviews_rdd.first()
File "E:\SearchForBussiness\lib\site-packages\pyspark\rdd.py", line 1378, in first
rs = self.take(1)
File "E:\SearchForBussiness\lib\site-packages\pyspark\rdd.py", line 1360, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "E:\SearchForBussiness\lib\site-packages\pyspark\context.py", line 1069, in runJob
sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "E:\SearchForBussiness\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "E:\SearchForBussiness\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "E:\SearchForBussiness\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:212)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:224)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Process finished with exit code 1