Я пытаюсь проверить функциональность потоковой передачи искры и хотел бы видеть, что DStreams - это то, что я ожидаю. Как я могу получить доступ к DStreams, чтобы проверить их содержимое? Можно ли сериализовать или преобразовать DStreams в другие типы данных, такие как массив или словарь?
Вот мой исходный код, учитывая, что у меня есть папка ./tmp
и файлы с 3 строками текста добавляются в нее каждую секунду:
import unittest
class StreamingMethodTest(unittest.TestCase):
def test_streaming_filesystem(self):
file_dir = "./tmp"
def test_fun(rdd):
# check if "count" equals 3
rdd.foreach(lambda record: self.assertEqual(record[1], 3))
sc = SparkContext(appName="streaming test")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream(file_dir)
counts = lines.map(lambda line: ("count", 1)).reduceByKey(lambda a, b: a + b)
counts.foreachRDD(test_fun)
ssc.start()
ssc.awaitTermination(10) # terminate after 10 seconds