Akka-Http, Server Sent Event и Akka Stream Source: Соединение получает «Сброс» / Потоковый источник закрывается после остановки или обновления последней страницы - PullRequest
0 голосов
/ 01 ноября 2018

Я пытаюсь создать какое-то веб-приложение, используя функцию «Сервер отправил событие» в Akka HTTP.

TL; DR : я могу запускать и использовать SSE из браузера, но если у меня есть только одна страница, и я обновляю ее или закрываю и перезагружаю страницу, точка входа SSE больше не работает, так как весь поток Akka на стороне сервера выглядит закрытым.

Я создал проект здесь:

https://github.com/totetmatt/wallemoji/tree/master/src/main/scala/fr/totetmatt/wallemoji

Я основал первую часть этого блога (+ некоторая адаптация, как это было от старой версии akka) http://loicdescotte.github.io/posts/play-akka-streams-twitter/

Здесь я создаю ActorRef Source, который получит какое-то событие и поместит его в Sink Publisher

http://github.com/totetmatt/wallemoji/blob/master/src/main/scala/fr/totetmatt/wallemoji/TwitterStream.scala#L31-L64

val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.dropHead)
  .toMat(Sink.asPublisher(true))(Keep.both)
  .run()

val statusListener = new StatusListener() {

  override def onStatus(status: Status) = {
    actorRef ! status
  }


  override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {
    // System.err.println(statusDeletionNotice)
  }

  override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {
    System.err.println(numberOfLimitedStatuses)
  }

  override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {

На основе документа https://doc.akka.io/docs/akka-http/current/sse-support.html я создаю источник из издателя, который генерирует поток SSE

http://github.com/totetmatt/wallemoji/blob/master/src/main/scala/fr/totetmatt/wallemoji/WebServer.scala#L40-L45

 ~ path("twitter") {
    get {
      complete {
        Source.fromPublisher(publisherTweet)
          .map(s=>EmojiParser.extractEmojis(s.getText))
          .mapConcat(l =>JavaConverters.asScalaIteratorConverter(l.iterator()).asScala.toSet )
          .map(x=>ServerSentEvent(x,"emoji"))
          .keepAlive(5.second, () => ServerSentEvent.heartbeat)
      }

}

Я использую данные здесь на стороне клиента

http://github.com/totetmatt/wallemoji/blob/master/templates/index.html#L78-L96

source.addEventListener("emoji", function(e) {
//console.log(e)
var col = getRandomIntInclusive(1,15)
var div = document.createElement("div");
div.classList.add("cell");
div.classList.add("animated");
div.classList.add("fadeInDown");
div.innerHTML = e.data
document.getElementById("col_"+col).prepend(div);

Эта хакерская дополнительная строка заставляет приложение работать как положено

http://github.com/totetmatt/wallemoji/blob/master/src/main/scala/fr/totetmatt/wallemoji/WebServer.scala#L33

 Source.fromPublisher(publisherTweet).runWith(Sink.ignore)

Если эта строка закомментирована / удалена , я могу подключиться в первый раз, открыть столько веб-страниц параллельно без каких-либо проблем. Но до тех пор, пока больше не загружается страница , источник выглядит остановленным и генерирует эти журналы

INFO] [10/23/2018 08:29:09.858] [wall-emoji-akka.actor.default-dispatcher-2] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.860] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.863] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.863] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [4] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.871] [wall-emoji-akka.actor.default-dispatcher-2] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [5] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.877] [wall-emoji-akka.actor.default-dispatcher-2] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [6] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.891] [wall-emoji-akka.actor.default-dispatcher-2] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [7] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.891] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [8] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.892] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [9] dead letters encountered. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [10/23/2018 08:29:09.892] [wall-emoji-akka.actor.default-dispatcher-14] [akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource] Message [twitter4j.StatusJSONImpl] without sender to Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877] was not delivered. [10] dead letters encountered, no more dead letters will be logged. If this is not an expected behavior, then [Actor[akka://wall-emoji/system/StreamSupervisor-0/flow-0-2-actorRefSource#1389184877]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown

Насколько я понимаю, это событие закрытия ActorRef Source, если клиент больше не потребляет последний SSE Source.

Я могу понять, что Источник, сгенерированный для каждого соединения, закрыт, но зачем идти вверх по всему графику, чтобы закрыть все?

Есть ли лучший способ избежать закрытия этого ActorRef? Или что не так с кодом, чтобы вести себя так?

...