Вход:
У меня есть следующий поток в NetCat:
$ nc -l -p 5555
example1
example2
example3
example4
example5
И следующий код в PySpark:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName("AccumulatorExample")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("Checkpoint_Accumulator")
dataStream = ssc.socketTextStream("localhost",5555)
exampleAccumulator = sc.accumulator(0)
def update_accumulator(x):
global exampleAccumulator
exampleAccumulator += 1
return None
exampleStream = dataStream.map(lambda x: update_accumulator(x))
exampleStream.pprint()
ssc.start()
ssc.awaitTerminationOrTimeout(15)
print(exampleAccumulator.value)
Выход
-------------------------------------------
Time: 2018-12-11 10:15:55
-------------------------------------------
None
None
None
None
None
-------------------------------------------
Time: 2018-12-11 10:16:00
-------------------------------------------
-------------------------------------------
Time: 2018-12-11 10:16:05
-------------------------------------------
5
Цель:
В приведенном выше примере значение exampleAccumulator
выводится после завершения потока.Я хочу, чтобы поток работал непрерывно, и я хочу иметь возможность доступа к exampleAccumulator.value
во время работы потока.Можно ли отправить exampleAccumulator.value
в сокет, чтобы его можно было прочитать другим приложением?