Что такое хорошая практика для модульного тестирования потоковой передачи искры в Python - PullRequest
0 голосов
/ 07 сентября 2018

Я пытаюсь проверить функциональность потоковой передачи искры и хотел бы видеть, что DStreams - это то, что я ожидаю. Как я могу получить доступ к DStreams, чтобы проверить их содержимое? Можно ли сериализовать или преобразовать DStreams в другие типы данных, такие как массив или словарь?

Вот мой исходный код, учитывая, что у меня есть папка ./tmp и файлы с 3 строками текста добавляются в нее каждую секунду:

import unittest

class StreamingMethodTest(unittest.TestCase):


    def test_streaming_filesystem(self):
        file_dir = "./tmp"

        def test_fun(rdd):
            # check if "count" equals 3
            rdd.foreach(lambda record: self.assertEqual(record[1], 3))

        sc = SparkContext(appName="streaming test")
        ssc = StreamingContext(sc, 1)

        lines = ssc.textFileStream(file_dir)
        counts = lines.map(lambda line: ("count", 1)).reduceByKey(lambda a, b: a + b)
        counts.foreachRDD(test_fun)
        ssc.start()
        ssc.awaitTermination(10) # terminate after 10 seconds
...