Play framework Scala: создайте бесконечный источник, используя потоки scala akka, и оставьте соединение с отправленными событиями сервера открытым на сервере. - PullRequest
0 голосов
/ 01 октября 2018

У нас есть требование к реализации отправленных сервером событий для следующих случаев использования:

  1. Отправка уведомлений в пользовательский интерфейс после некоторой обработки на сервере.Эта обработка основана на некоторой логике
  2. Отправка уведомлений в пользовательский интерфейс после чтения сообщений из RabbitMQ с последующим выполнением некоторой операции над ним.

Набор технологий, который мы используем Scala (2.11 / 2.12)с игровой рамкой (2.6.x).Библиотека: akka.stream.scaladsl.Source

Мы начали доказательство концепции со следующего примера https://github.com/playframework/play-scala-streaming-example, а затем расширили, создав различные источники.Мы попытались создать источник с помощью Source.apply, soure.single.

Но как только все элементы в источнике были перенесены в пользовательский интерфейс, мои потоки событий закрылись.Но я не хочу, чтобы поток событий закрывался .Также я не хочу использовать некоторые таймеры (Source.tick) или Source.repeat.

Когда мой источник был создан, коллекция имела, скажем, несколько элементов x, а затем сервис добавил еще 4 элемента.Но после x элементов поток событий закрывается, а затем снова открывается.

Есть ли какой-либо способ, которым мой поток событий может быть бесконечным и будет закрыт только в том случае, если мой сеанс вышел из системы или мы можем явно закрыть его.

// Код для KeepAlive (как указано в комментариях)

   object NotficationUtil {

      var userNotificationMap = Map[Integer, Queue[String]]()

      def addUserNotification(userId: Integer, message: String) = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
        queue += message
        userNotificationMap.put(userId, queue)

      }

      def pushNotification(userId: Integer): Source[JsValue, _] = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
         Source.single(Json.toJson(queue.dequeueAll { x => true }))
      }
    }
    @Singleton
    class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{

      def pushNotifications(user_id:Integer) = Action {
      val stream = NotficationUtil.pushNotification(user_id)
       Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
     }

}

1 Ответ

0 голосов
/ 25 октября 2018

Используйте приведенный ниже код для создания actorref и издателя

val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()

И создайте свой источник от этого издателя

val testsource = Source
      .fromPublisher[T](sourcePublisher)

И зарегистрируйте слушателя как

Ok.chunked(
        testsource.keepAlive(
          50.seconds,
          () => Json.obj("data"->"heartbeat")) via EventSource.flow)
      .as(ContentTypes.EVENT_STREAM)
      .withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")

Отправьте ваши данные json рефектору, и данные будут передаваться как поток событий через этот источник во внешний интерфейс.Надеюсь, это поможет.

...