У нас есть требование к реализации отправленных сервером событий для следующих случаев использования:
- Отправка уведомлений в пользовательский интерфейс после некоторой обработки на сервере.Эта обработка основана на некоторой логике
- Отправка уведомлений в пользовательский интерфейс после чтения сообщений из RabbitMQ с последующим выполнением некоторой операции над ним.
Набор технологий, который мы используем Scala (2.11 / 2.12)с игровой рамкой (2.6.x).Библиотека: akka.stream.scaladsl.Source
Мы начали доказательство концепции со следующего примера https://github.com/playframework/play-scala-streaming-example, а затем расширили, создав различные источники.Мы попытались создать источник с помощью Source.apply, soure.single.
Но как только все элементы в источнике были перенесены в пользовательский интерфейс, мои потоки событий закрылись.Но я не хочу, чтобы поток событий закрывался .Также я не хочу использовать некоторые таймеры (Source.tick) или Source.repeat.
Когда мой источник был создан, коллекция имела, скажем, несколько элементов x, а затем сервис добавил еще 4 элемента.Но после x элементов поток событий закрывается, а затем снова открывается.
Есть ли какой-либо способ, которым мой поток событий может быть бесконечным и будет закрыт только в том случае, если мой сеанс вышел из системы или мы можем явно закрыть его.
// Код для KeepAlive (как указано в комментариях)
object NotficationUtil {
var userNotificationMap = Map[Integer, Queue[String]]()
def addUserNotification(userId: Integer, message: String) = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
queue += message
userNotificationMap.put(userId, queue)
}
def pushNotification(userId: Integer): Source[JsValue, _] = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
Source.single(Json.toJson(queue.dequeueAll { x => true }))
}
}
@Singleton
class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{
def pushNotifications(user_id:Integer) = Action {
val stream = NotficationUtil.pushNotification(user_id)
Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
}
}