Создайте IObservable и немедленно верните результат кэшированной асинхронной операции - PullRequest
1 голос
/ 08 декабря 2010

Я использую реактивные расширения для вызова асинхронного метода и хочу кэшировать результат и возвращать его для последующих вызовов метода.

Как мне создать экземпляр Observable, вернуть его и предоставить данные (cacheResult), необходимые для подписки?

public IObservable<Bar> GetBars(int pageIndex, int pageSize)
{
   var @params = new object[] { pageIndex, pageSize };
   var cachedResult = _cache.Get(@params);
   if (cachedResult != null)
   {
 // How do I create a Observable instance and return the 'cacheResult'...
 return ...
   }

   var observable = new BaseObservable<Bar>();
   _components.WithSsoToken(_configuration.SsoToken)
      .Get(@params)
      .Select(Map)
      .Subscribe(c =>
                     {
                          _cache.Add(@params, c);
                          observable.Publish(c);
                          observable.Completed();
                     }, exception =>
                     {
                        observable.Failed(exception);
                        observable.Completed();
                     });

       return observable;
}

Ответы [ 2 ]

3 голосов
/ 08 декабря 2010

Я полагаю, что вы ищете Observable.Return:

return Observable.Return((Bar)cachedResult);

На неродственной ноте:

  • Нет необходимости возвращать BaseObservable<T>. Вы должны вернуть Subject<T>, поскольку она делает то, что делает ваша реализация, но является поточно-ориентированной (вы также должны вызвать .AsObservable() для возвращаемого значения, которое не может быть возвращено).
  • Вы используете Do для добавления значения в кеш:

var observable = new Subject<Bar>();
_components.WithSsoToken(_configuration.SsoToken)
    .Get(@params)
    .Select(Map)
    .Subscribe(c =>
    {
        _cache.Add(@params, c);
        observable.OnNext(c);
        observable.OnCompleted();
    }, exception =>
    {
        observable.OnError(exception);
    });

return observable.AsObservable();
2 голосов
/ 09 декабря 2010

Удобно, я написал класс, который делает этот шаблон для вас, проверьте его:

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ObservableAsyncMRUCache.cs

var cache = new ObservableAsyncMRUCache<int, int>(
    x => Observable.Return(x*10).Delay(1000) /* Return an IObservable here */, 
    100 /*items to cache*/,
    5 /* max in-flight, important for web calls */
    );

IObservable<int> futureResult = cache.AsyncGet(10);

futureResult.Subscribe(Console.WriteLine);
>>> 100

Некоторые хитрые вещи, которые он обрабатывает правильно:

  • Кэширует последние n предметов и выбрасывает неиспользуемые предметы
  • Это гарантирует, что одновременно выполняется не более n элементов - если вы этого не сделаете, вы можете легко порождать тысячи веб-вызовов, если кэш пуст
  • Если вы запрашиваете один и тот же элемент два раза подряд, первый запрос инициирует запрос, а второй вызов будет ждать первого, а не порождать идентичный запрос, поэтому вы не будете в итоге запрашивать данные избыточно .
...