Избежание нескольких вызовов в наблюдаемом конвейере - PullRequest
0 голосов
/ 03 апреля 2019

Я пытаюсь создать метод GetAndFetch, который сначала возвращает данные из кэша, затем извлекает и возвращает данные из веб-службы и, наконец, обновляет кэш.

Такая функция уже существует в akavache, однако данные, которые она извлекает или хранит, похожи на большой двоичный объект.то есть, если я заинтересован в rss фиде, я мог бы работать только на уровне всего фида, а не отдельных элементов.Я заинтересован в создании версии, которая возвращает элементы как IObservable<Item>.Это дает преимущество в том, что новые Item могут отображаться, как только они возвращаются service, и не ожидают всех Items s.

public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
    // The basic idea is to first get the cached objects
    IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);

    // Then call the service   
    IObservable<Item> fetchObs = service.GetItems(feedUrl);

    // Consolidate the cache & the retrieved data and then update cache
    IObservable<Item> updateObs = fetchObs
                                      .ToArray()
                                      .MyFilter() // filter out duplicates between retried data and cache
                                      .SelectMany(arg =>
                                      {
                                          return cache.InsertObject(feedUrl, arg)
                                          .SelectMany(__ => Observable.Empty<Item>());
                                      });

    // Then make sure cache retrieval, fetching and update is done in order
    return cacheBlobObject.SelectMany(x => x.ToObservable())
                .Concat(fetchObs)
                .Concat(upadteObs);
}

Проблема с моим подходом заключается в том, что Concat(upadteObs) подписывается на fetchObs и заканчивает тем, что снова вызывает service.GetItems(feedUrl), что расточительно.

1 Ответ

1 голос
/ 03 апреля 2019

Вы говорите так, будто вам нужна перегрузка .Publish(share => { ... }).

Попробуйте это:

public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
    // The basic idea is to first get the cached objects
    IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);

    return
        service
            .GetItems(feedUrl)
            .Publish(fetchObs =>
            {
                // Consolidate the cache & the retrieved data and then update cache
                IObservable<Item> updateObs =
                    fetchObs
                        .ToArray()
                        .MyFilter() // filter out duplicates between retried data and cache
                        .SelectMany(arg =>
                            cache
                                .InsertObject(feedUrl, arg)
                                .SelectMany(__ => Observable.Empty<Item>()));

                // Then make sure cache retrieval, fetching and update is done in order
                return
                    cacheBlobObject
                        .SelectMany(x => x.ToObservable())
                        .Concat(fetchObs)
                        .Concat(updateObs);
            });
}

Меня беспокоит Concat звонки - возможно, они должны быть Merge.

Кроме того, похоже, что ваш вызов service.GetItems все равно получает все элементы - как избежать элементов, уже находящихся в кеше?


Альтернативная реализация, основанная на комментариях:

public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
    return
    (
        from hs in cache.GetObject<HashSet<Item>>(feedUrl)
        let ids = new HashSet<string>(hs.Select(x => x.Id))
        select
            hs
                .ToObservable()
                .Merge(
                    service
                        .GetItems(feedUrl)
                        .Where(x => !ids.Contains(x.Id))
                        .Do(x => cache.InsertObject(feedUrl, new [] { x })))
    ).Merge();
}
...