Подключите подписку Redis к WebSocket с потоками play + akka - PullRequest
0 голосов
/ 07 февраля 2020

Я экспериментирую с созданием простого сервера чата WebSocket с Play 2.8 + akka-streams + redi scala

Это конечная точка моего контроллера, которая работает - но я уверен, что должен быть лучший способ , В частности, я не думаю, что мне пришлось использовать непостоянную очередь для буферизации входящих сообщений перед их отправкой на Source.tick - наверняка должен быть способ отправить их непосредственно на Source?

Есть ли более подходящий клиент Redis, который я мог бы попробовать?

Кроме того, как мне справиться с уничтожением подписки Redis при закрытом сокете?

  val rc = RedisClient(host = "127.0.0.1", port = 6397)

  def socket(user: String) = WebSocket.accept[String, String] { request =>

    val q = mutable.Queue[String]()

    val rps = new RedisPubSub(
      host = "127.0.0.1",
      port = 6397,
      channels = Nil,
      patterns = Seq("user/*/chatter"),
      onPMessage = msg => q.enqueue(msg.channel + " " + msg.data.utf8String)
    )

    val source: Source[String, Cancellable] = Source.tick(0 seconds, 1 seconds, Unit)
      .map { s =>
        Source(q.dequeueAll(_ => true).toList)
      }
      .flatMapConcat(identity)

    val sink: Sink[String, Future[Done]] = Sink.foreach[String] { s =>
      rc.publish(s"user/$user/chatter", s)
    }

    Flow.fromSinkAndSource(sink, source)
  }

Любые предложения по улучшению будут с благодарностью!

Спасибо

NFV

...