Поток Pyspark создает решетчатую структуру - PullRequest
0 голосов
/ 20 апреля 2020

Я очень новичок в потоковом воспроизведении. Я хочу написать программу для имитации структуры данных решетки с искровой потоковой передачей. Например, у нас есть входной список list["A", "B", "C", "D"], и мы поддерживаем список результатов:

time 1
["A"]
----------
time 2
["A", "B", "AB"]
---------
time 3
["A", "B", "c", "AB", "BC", "ABC"]
--------
time 4
["A", "B", "c", "D" "AB", "BC", "CD" "ABC", "BCD"]

Моя первая мысль - использовать updateStateByKey и изменить код из stateful_network_wordcount.py пример , Но я застрял, чтобы объединить ввод rdd с результатом. Код:

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: sparkLattice.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext(appName="sparkLattice")
    ssc = StreamingContext(sc, 2)
    ssc.checkpoint("checkpoint")

    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(1, 'hello'), (1, 'world')])

    def updateFunc(new_values, last_sum):
        word = ""
        for i in new_values:
            word = word + i
        if last_sum:
            return last_sum + word
        else:
            return word

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    word = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (1, word))
    oldWord = word.cache() # store this dstream to union at next process
    result = word.updateStateByKey(updateFunc)
    result1 = result.union(word) # Trying to union the incoming data
    # and union with oldWord from previous process
    result1.pprint()

    ssc.start()
    ssc.awaitTermination()

И что я ожидаю, это:

time 1
(1, u"A")
---------
time 2
(1, u"A")
(1, U"AB")
(1, u"B")
---------
time 3
(1, u"A")
(1, U"AB")
(1, u"B")
(1, u"C")
(1, U"BC")
(1, U"ABC")

Как я могу объединить поступающие данные с dstream, который использует updateStateByKey, и сохранить предыдущий dstream в union в следующий раз? Пожалуйста, любые предложения или помощь будет принята с благодарностью !!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...