Я работаю над приложением для обмена сообщениями и хотел бы иметь службу, которая предоставляет наблюдаемую newMessages
. Различные части приложения могут подписываться на наблюдаемые и отображать значки для новых сообщений и так далее.
У меня есть пара ограничений:
У меня не может быть целого сообщения в полезной нагрузке уведомления (некоторые ограничения безопасности). Вместо этого, когда я получаю уведомление с uuid
, я запрашиваю у сервера сообщение с этим uuid
.
На самом деле, я запрашиваю у сервера все новые сообщения, загружаю последнее сообщение uuid
из своей локальной базы данных и запрашиваю у сервера все сообщения после этого uuid
. Я кеширую все новые сообщения в локальной базе данных.
На первом шаре я подумал:
Инициализировать PublicSubject
с именем newMessages
.
Когда приходит уведомление, проверьте, есть ли у вас сообщение с этим uuid
в локальной БД. Если да -> показать уведомление; если нет -> загрузить все новые сообщения с сервера, сохранить их в локальной БД, отправить их в тему newMessages
и отобразить уведомление.
Примерно так:
private val newMessagesSubject = PublishSubject.create<List<UserMessage>>()
val newMessages = newMessagesSubject.observeOn(AndroidSchedulers.mainThread())
fun newNotificationArrived(uuid: UUID) {
fun filterCurrentMessage(messages: List<UserMessage>) = messages
.firstOrNull { it.uuid == uuid }
?: throw IllegalStateException("Can't find a message with id: $uuid")
/*
* If we manage to load from the local database, we don't emit because we assume
* that the request that we emited the messages before storing them into the DB.
*/
fun loadFromDatabase() = Maybe.fromCallable { messageTable.getMessageWithUuid(uuid) }
fun loadFromServer(accessCode: String, latestMessageUuid: UUID) =
messagingApi
.getUserMessagesAfterUUID(accessCode, latestMessageUuid, null)
.retryWithExponentialBackoff()
fun loadCacheEmitAndMap(accessCode: String, latestMessageUuid: UUID) =
loadFromServer(accessCode, latestMessageUuid)
.doOnSuccess { messageTable.insertNewMessages(it) }
.doOnSuccess { newMessagesSubject.onNext(it) }
.map(::filterCurrentMessage)
Single
.fromCallable { accessCodeSettings.accessCode to messageTable.latestUUID }
.flatMap { (accessCode, latestMessageUuid) ->
loadFromDatabase()
.switchIfEmpty(loadCacheEmitAndMap(accessCode, latestMessageUuid))
}
.subscribe(::showNotification)
}
При таком подходе у меня возникает следующая проблема:
Если два уведомления приходят одно за другим, оба могут заметить, что соответствующих uuid
нет в базе данных, поэтому оба будут загружать с сервера новые сообщения, возможно один и тот же набор сообщений, оба будут сохранять сообщения в базе данных, и оба будут отправлять сообщения в теме newMessages
. Итак, я получу дублированные сообщения.
- Я не могу
synchronize
метод, который выбирает сообщения, потому что он также является наблюдаемым, и я не хочу блокировать его.
Как бы вы спроектировали наблюдаемый поток, чтобы гарантировать, что объект newMessages
правильный?