Я экспериментирую с созданием простого сервера чата WebSocket с Play 2.8 + akka-streams + redi scala
Это конечная точка моего контроллера, которая работает - но я уверен, что должен быть лучший способ , В частности, я не думаю, что мне пришлось использовать непостоянную очередь для буферизации входящих сообщений перед их отправкой на Source.tick
- наверняка должен быть способ отправить их непосредственно на Source
?
Есть ли более подходящий клиент Redis, который я мог бы попробовать?
Кроме того, как мне справиться с уничтожением подписки Redis при закрытом сокете?
val rc = RedisClient(host = "127.0.0.1", port = 6397)
def socket(user: String) = WebSocket.accept[String, String] { request =>
val q = mutable.Queue[String]()
val rps = new RedisPubSub(
host = "127.0.0.1",
port = 6397,
channels = Nil,
patterns = Seq("user/*/chatter"),
onPMessage = msg => q.enqueue(msg.channel + " " + msg.data.utf8String)
)
val source: Source[String, Cancellable] = Source.tick(0 seconds, 1 seconds, Unit)
.map { s =>
Source(q.dequeueAll(_ => true).toList)
}
.flatMapConcat(identity)
val sink: Sink[String, Future[Done]] = Sink.foreach[String] { s =>
rc.publish(s"user/$user/chatter", s)
}
Flow.fromSinkAndSource(sink, source)
}
Любые предложения по улучшению будут с благодарностью!
Спасибо
NFV