Я просто пытаюсь запустить пример кода Statefu lstreaming, но он завершается ошибкой. Не могу понять, почему это происходит.
Spark 2.3 с 3,6 питоном на Cloudera VM 5.13.3
Варианты выполнения:
--master local[*] --queue PyCharmSpark pyspark-shell
Мой код:
from pyspark import SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
conf = (SparkConf()
.setAppName("ch2_dstreams_t1.py"))
spark = SparkSession.builder \
.appName(" ") \
.config(conf=conf) \
.getOrCreate()
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
# define the update function
def updatetotalcount(currentcount, countstate):
if countstate is None:
countstate = 0
return sum(currentcount, countstate)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 7777)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1)) \
wordCounts = pairs.reduceByKey(lambda x, y: x + y)\
totalCounts = wordCounts.updateStateByKey(updatetotalcount)
totalCounts.pprint()
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
Поток работает и слушает сокет, но при попытке ввода строк в терминальном приложении происходит сбой с
ошибка:
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2018-08-27 11:40:10
-------------------------------------------
[Stage 0:> (0 + 1) / 1]18/08/27 11:40:15 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:15 WARN storage.BlockManager: Block input-0-1535395215600 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395215800 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216000 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216200 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216400 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:20 WARN storage.BlockManager: Putting block rdd_30_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
.
18/08/27 11:40:20 WARN storage.BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
18/08/27 11:40:20 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/08/27 11:40:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Вероятно, основная причина в моей функции updatetotalcount , когда я комментирую преобразование updateStateByKey (updatetotalcount) выводит результаты в выводе:
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
Пожалуйста, совет, почему я получаю эту ошибку?