Как я могу отправить результаты аккумулятора в розетку? - PullRequest
0 голосов
/ 11 декабря 2018

Вход:

У меня есть следующий поток в NetCat:

$ nc -l -p 5555
example1
example2
example3
example4
example5

И следующий код в PySpark:

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("AccumulatorExample")

ssc = StreamingContext(sc, 5)
ssc.checkpoint("Checkpoint_Accumulator")
dataStream = ssc.socketTextStream("localhost",5555)

exampleAccumulator = sc.accumulator(0)

def update_accumulator(x):
    global exampleAccumulator
    exampleAccumulator += 1
    return None

exampleStream = dataStream.map(lambda x: update_accumulator(x))
exampleStream.pprint()
ssc.start()
ssc.awaitTerminationOrTimeout(15)

print(exampleAccumulator.value)

Выход

-------------------------------------------
Time: 2018-12-11 10:15:55
-------------------------------------------
None
None
None
None
None

-------------------------------------------
Time: 2018-12-11 10:16:00
-------------------------------------------

-------------------------------------------
Time: 2018-12-11 10:16:05
-------------------------------------------

5

Цель:

В приведенном выше примере значение exampleAccumulator выводится после завершения потока.Я хочу, чтобы поток работал непрерывно, и я хочу иметь возможность доступа к exampleAccumulator.value во время работы потока.Можно ли отправить exampleAccumulator.value в сокет, чтобы его можно было прочитать другим приложением?

...