Как правильно синхронизировать Rx-субъектные вызовы `.onNext`? - PullRequest
0 голосов
/ 13 января 2019

Я работаю над приложением для обмена сообщениями и хотел бы иметь службу, которая предоставляет наблюдаемую newMessages. Различные части приложения могут подписываться на наблюдаемые и отображать значки для новых сообщений и так далее.

У меня есть пара ограничений:

  1. У меня не может быть целого сообщения в полезной нагрузке уведомления (некоторые ограничения безопасности). Вместо этого, когда я получаю уведомление с uuid, я запрашиваю у сервера сообщение с этим uuid.

  2. На самом деле, я запрашиваю у сервера все новые сообщения, загружаю последнее сообщение uuid из своей локальной базы данных и запрашиваю у сервера все сообщения после этого uuid. Я кеширую все новые сообщения в локальной базе данных.

На первом шаре я подумал:

  1. Инициализировать PublicSubject с именем newMessages.

  2. Когда приходит уведомление, проверьте, есть ли у вас сообщение с этим uuid в локальной БД. Если да -> показать уведомление; если нет -> загрузить все новые сообщения с сервера, сохранить их в локальной БД, отправить их в тему newMessages и отобразить уведомление.

Примерно так:

private val newMessagesSubject = PublishSubject.create<List<UserMessage>>()
val newMessages = newMessagesSubject.observeOn(AndroidSchedulers.mainThread())

fun newNotificationArrived(uuid: UUID) {

  fun filterCurrentMessage(messages: List<UserMessage>) = messages
    .firstOrNull { it.uuid == uuid }
    ?: throw IllegalStateException("Can't find a message with id: $uuid")

  /*
   * If we manage to load from the local database, we don't emit because we assume
   * that the request that we emited the messages before storing them into the DB.
   */
  fun loadFromDatabase() = Maybe.fromCallable { messageTable.getMessageWithUuid(uuid) }
  fun loadFromServer(accessCode: String, latestMessageUuid: UUID) =
    messagingApi
      .getUserMessagesAfterUUID(accessCode, latestMessageUuid, null)
      .retryWithExponentialBackoff()
  fun loadCacheEmitAndMap(accessCode: String, latestMessageUuid: UUID) =
    loadFromServer(accessCode, latestMessageUuid)
      .doOnSuccess { messageTable.insertNewMessages(it) }
      .doOnSuccess { newMessagesSubject.onNext(it) }
      .map(::filterCurrentMessage)

  Single
    .fromCallable { accessCodeSettings.accessCode to messageTable.latestUUID }
    .flatMap { (accessCode, latestMessageUuid) ->
      loadFromDatabase()
        .switchIfEmpty(loadCacheEmitAndMap(accessCode, latestMessageUuid))
    }
    .subscribe(::showNotification)
}

При таком подходе у меня возникает следующая проблема:

  • Если два уведомления приходят одно за другим, оба могут заметить, что соответствующих uuid нет в базе данных, поэтому оба будут загружать с сервера новые сообщения, возможно один и тот же набор сообщений, оба будут сохранять сообщения в базе данных, и оба будут отправлять сообщения в теме newMessages. Итак, я получу дублированные сообщения.

    • Я не могу synchronize метод, который выбирает сообщения, потому что он также является наблюдаемым, и я не хочу блокировать его.

Как бы вы спроектировали наблюдаемый поток, чтобы гарантировать, что объект newMessages правильный?

1 Ответ

0 голосов
/ 14 января 2019

Вы можете создать внутреннюю службу, которая обрабатывает запросы на сообщения.

fun requestMessage( message: UUID ): Single<UserMessage>

Он будет ответственен за получение сообщения, если необходимо, перейдя на сервер. Внутри он будет использовать кэш, индексированный UUID. Вот подход, который я нашел полезным:

val messageCache: Map<UUID, BehaviorSubject<UserMessage>> = Maps.newConcurrentMap();

Реализация requestMessage() выглядит так:

return messageCache.computeIfAbsent(message, id -> {
   val res: BehaviorSubject.<UserMessage>create()
   getMessage(id).subscribe(res)
   return res.take(1).toSingle()
  })

(простите за мой плохой синтаксис Котлина). Это всегда будет возвращать наблюдаемое, которое в конечном итоге содержит сообщение. ConcurrentHashMap обеспечивает выдачу только одного запроса на обслуживание для любого идентификатора сообщения.

...