pyspark updateStateByKey завершается неудачно при вызове моей функции - PullRequest
0 голосов
/ 27 августа 2018

Я просто пытаюсь запустить пример кода Statefu lstreaming, но он завершается ошибкой. Не могу понять, почему это происходит.

Spark 2.3 с 3,6 питоном на Cloudera VM 5.13.3

Варианты выполнения:

--master local[*] --queue PyCharmSpark pyspark-shell 

Мой код:

from pyspark import SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *

conf = (SparkConf()
       .setAppName("ch2_dstreams_t1.py"))

spark = SparkSession.builder \
     .appName(" ") \
     .config(conf=conf) \
     .getOrCreate()


# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")

# define the update function
def updatetotalcount(currentcount, countstate):
    if countstate is None:
       countstate = 0
    return sum(currentcount, countstate)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 7777)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1)) \

wordCounts = pairs.reduceByKey(lambda x, y: x + y)\

totalCounts = wordCounts.updateStateByKey(updatetotalcount)

totalCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

Поток работает и слушает сокет, но при попытке ввода строк в терминальном приложении происходит сбой с

ошибка:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2018-08-27 11:40:10
-------------------------------------------

[Stage 0:>                                                          (0 + 1) / 1]18/08/27 11:40:15 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:15 WARN storage.BlockManager: Block input-0-1535395215600 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395215800 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216000 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216200 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216400 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:20 WARN storage.BlockManager: Putting block rdd_30_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
.
18/08/27 11:40:20 WARN storage.BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
18/08/27 11:40:20 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/08/27 11:40:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Вероятно, основная причина в моей функции updatetotalcount , когда я комментирую преобразование updateStateByKey (updatetotalcount) выводит результаты в выводе:

File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
        return sum(currentcount, countstate)
    TypeError: _() takes 1 positional argument but 2 were given

Пожалуйста, совет, почему я получаю эту ошибку?

1 Ответ

0 голосов
/ 28 августа 2018

Проблема была в использовании

from pyspark.sql.functions import *

, который переопределяет функции Python. Если вы хотите использовать pyspark.sql.functions, необходимо разрешить пространства имен, например, а

import pyspark.sql.functions as f
...