Тестирование Twitter с помощью Spark Streaming API - PullRequest
0 голосов
/ 27 августа 2018

Я новичок в Streaming Framework Spark и пытался обработать поток Twitter. Я нахожусь в процессе написания тестовых примеров для одного и того же и понимаю, что могу использовать Spark StreamingSuiteBase, который поможет мне тестировать ввод в качестве потока моих функций. Но я написал функцию, которая принимает DStream [Status] в качестве входных данных и после обработки выдает DStream [String] в качестве выходных данных. API, который я использую от StreamingSuiteBase, это testOperation.

test("Filter only words Starting with #")  {
  val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
  val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))

  testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)

А это функция, на которую отправляется входной сигнал ..

 def getText(englishTweets: DStream[Status]): DStream[String] = {
    println(englishTweets.toString)

    val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))

    hashTags
  }

Но я получаю сообщение об ошибке "несоответствие типов" из-за DStream [Status] и DStream [String]. Как мне издеваться над Stream [Status].

1 Ответ

0 голосов
/ 29 октября 2018

Итак, я решил эту проблему, получив статус Twitter из API "createStatus" TwitterObjectFactory. Не было нужды издеваться TwitterStatus. Даже если вам удастся посмеяться над этим, есть проблемы с сериализацией. Итак, это лучшее решение:

val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
val tweetStatus = TwitterObjectFactory.createStatus(rawJson)

Надеюсь, это кому-нибудь поможет!

...