Актер Akka выдает ошибку нулевого указателя, причина неизвестна - PullRequest
0 голосов
/ 12 мая 2019

У меня есть контроллер покоя, который вызывает службу, которая, в свою очередь, вызывает актера, чтобы получить запрос из имитированной базы данных. Сообщение попадает к актеру, но приложение падает до того, как актер может ответить, и от актера возникает исключение нулевого указателя. Я использую akka http для контроллера и директивы маршрутизации для составления ответа. Это мои зависимости:

"com.typesafe.akka" %% "akka-http"   % "10.1.8",
"com.typesafe.akka" %% "akka-actor"  % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"




class CacheActor extends Actor {

  val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
  val log = Logging(context.system, this)

  var tweetMap: scala.collection.mutable.Map[String, List[String]] =
    scala.collection.mutable.Map[String, List[String]]()

  // consult the in-memory map, if the username is not found, call the repository, update  the map, and return the tweets
  def queryRepo(username: String): Future[Option[List[String]]] = {
    if (tweetMap isDefinedAt username) {

      return Future(tweetMap.get(username))
    } else {
      var listOfTweetTexts: List[String] = List[String]()

      val queryLimit = 10

      val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)

      onComplete(resultTweets) {
        case Success(tweets) =>
          for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }

          tweetMap(username) = listOfTweetTexts

          return Future(Option(listOfTweetTexts))

        case Failure(t) =>
          log.error("An error has occurred: " + t.getMessage)
          return null
      }
    }
    return null
  }

  def receive = {

    case message: TweetQuery => // .take(message.limit)

      val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)

      queryResult onComplete {
        case Success(result) => sender() ! result

        case Failure(t) => log.error("An error has occurred: " + t.getMessage)
      }
  }
}

1 Ответ

0 голосов
/ 13 мая 2019

Трассировка стека была бы полезна, но я подозреваю, что эта строка здесь в вашем receive вызывает NPE:

queryResult onComplete {

Ваша функция queryRepo возвращает null, если tweetMap неопределенный в username.

ОБНОВЛЕНИЕ

И вот почему:

Функция queryRepo возвращает Furture[Seq[String]] ровно в одном случае

if (tweetMap isDefinedAt username) {
  return Future(tweetMap.get(username))
}

В блоке else вы создаете Future[Seq[String]]

val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)

, но никогда не возвращаете его. Вместо вы передаете обратный вызов Future, который выполняется асинхронно после завершения фьючерса, следовательно, onComplete.(Я заметил, что вы не вызываете onComplete на Future напрямую, а функцию onComplete, которая принимает в качестве аргумента будущее и частичную функцию, и я предполагаю, что эта функция регистрирует обычный обратный вызов onComplete.)

Таким образом, результатом блока else вашего оператора if будет Unit, а not Future[Seq[String]]

Код

 for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }

 tweetMap(username) = listOfTweetTexts

 return Future(Option(listOfTweetTexts))

равенскорее всего выполняется после того, как queryRepo уже вернул null.

def queryRepo(username: String): Future[Option[List[String]]] = {
  if (tweetMap isDefinedAt username) {
    ...
  } else {
    ...
  }
  return null // <--- here
}

End UPDATE

Если вы измените следующие строки:

val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)

  onComplete(resultTweets) {
    case Success(tweets) =>
      for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }

      tweetMap(username) = listOfTweetTexts

      return Future(Option(listOfTweetTexts))

    case Failure(t) =>
      log.error("An error has occurred: " + t.getMessage)
      return null
  }

до:

tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
  // NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables  
  val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
  // again, access to an actor member from within a `Future` is not wise or rather a bug in the making. 
  // But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
  tweetMap(username) = listOfTweetTexts
  Option(listOfTweetTexts)
}

NullPointerException больше не должно происходить.

Однако у меня сложилось впечатление, что вы могли бы использовать больше тренировок с futures , актеры и функциональное программирование в scala в целом.

Например,

  • изменяемое локальное состояние актера работает только в том случае, если к нему обращаются изнутри receive, а не в асинхронном Future или Thread
  • один обычно не использует ключевое слово return в scala
  • , если не для совместимости с Java API, нет необходимости когда-либо возвращать null.
  • и еще много очков ...
...