Я очень новичок в потоковом воспроизведении. Я хочу написать программу для имитации структуры данных решетки с искровой потоковой передачей. Например, у нас есть входной список list["A", "B", "C", "D"]
, и мы поддерживаем список результатов:
time 1
["A"]
----------
time 2
["A", "B", "AB"]
---------
time 3
["A", "B", "c", "AB", "BC", "ABC"]
--------
time 4
["A", "B", "c", "D" "AB", "BC", "CD" "ABC", "BCD"]
Моя первая мысль - использовать updateStateByKey
и изменить код из stateful_network_wordcount.py пример , Но я застрял, чтобы объединить ввод rdd с результатом. Код:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: sparkLattice.py <hostname> <port>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="sparkLattice")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(1, 'hello'), (1, 'world')])
def updateFunc(new_values, last_sum):
word = ""
for i in new_values:
word = word + i
if last_sum:
return last_sum + word
else:
return word
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
word = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (1, word))
oldWord = word.cache() # store this dstream to union at next process
result = word.updateStateByKey(updateFunc)
result1 = result.union(word) # Trying to union the incoming data
# and union with oldWord from previous process
result1.pprint()
ssc.start()
ssc.awaitTermination()
И что я ожидаю, это:
time 1
(1, u"A")
---------
time 2
(1, u"A")
(1, U"AB")
(1, u"B")
---------
time 3
(1, u"A")
(1, U"AB")
(1, u"B")
(1, u"C")
(1, U"BC")
(1, U"ABC")
Как я могу объединить поступающие данные с dstream, который использует updateStateByKey, и сохранить предыдущий dstream в union в следующий раз? Пожалуйста, любые предложения или помощь будет принята с благодарностью !!