Я новичок в Streaming Framework Spark и пытался обработать поток Twitter.
Я нахожусь в процессе написания тестовых примеров для одного и того же и понимаю, что могу использовать Spark StreamingSuiteBase, который поможет мне тестировать ввод в качестве потока моих функций.
Но я написал функцию, которая принимает DStream [Status] в качестве входных данных и после обработки выдает DStream [String] в качестве выходных данных.
API, который я использую от StreamingSuiteBase, это testOperation.
test("Filter only words Starting with #") {
val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))
testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)
А это функция, на которую отправляется входной сигнал ..
def getText(englishTweets: DStream[Status]): DStream[String] = {
println(englishTweets.toString)
val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))
hashTags
}
Но я получаю сообщение об ошибке "несоответствие типов" из-за DStream [Status] и DStream [String]. Как мне издеваться над Stream [Status].