Вызов дорогой функции Reactive Extensions `IObservable` только тогда, когда значения не находятся в локальном кэше - PullRequest
2 голосов
/ 21 апреля 2011

Я использую Reactive Extensions (Rx) и шаблон репозитория для облегчения получения данных из относительно медленного источника данных.У меня есть следующий (упрощенный) интерфейс:

public interface IStorage
{
    IObservable<INode> Fetch(IObservable<Guid> ids);
}

Создание экземпляра реализации IStorage идет медленно - подумайте о создании веб-службы или соединения с БД.Каждый Guid в ids наблюдаемой приводит к однозначному INode (или null) в наблюдаемой отдаче, и каждый результат дорогой.Следовательно, для меня имеет смысл только создавать экземпляр IStorage, только если у меня есть хотя бы одно значение для выборки, а затем использовать IStorage для выборки только значений один раз для каждого Guid.

Для ограничениявызовы IStorage Я кеширую результаты в своем классе Repository, который выглядит следующим образом:

public class Repository
{
    private Dictionary<Guid, INode> NodeCache { get; set; }

    private Func<IStorage> StorageFactory { get; set; }

    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        var lazyStorage = new Lazy<IStorage>(this.StorageFactory);

        // from id in ids
        // if NodeCache contains id select NodeCache[id]
        // else select node from lazyStorage.Value.Fetch(...)
    }
}

В методе Repository.Fetch(...) я включил комментарии, указывающие, что я пытаюсь сделать.

По сути, если NodeCache содержит все извлекаемые идентификаторы, то IStorage никогда не создается, и результаты возвращаются практически без задержки.Однако, если какой-либо один идентификатор не находится в кэше, то создается экземпляр IStorage, и все неизвестные идентификаторы передаются с помощью метода IStorage.Fetch(...).

Отображение один-к-одному, включая сохранение порядка,нужно поддерживать.

Есть идеи?

Ответы [ 2 ]

1 голос
/ 28 июня 2011

Потребовалось некоторое время, чтобы решить это, но я наконец-то нашел свое собственное решение.

Я определил два метода расширения с именем FromCacheOrFetch с этими сигнатурами:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, Maybe<R>> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)

Первый использует стандартные типы CLR / Rx, а второй - монаду Maybe (допускающие типы, не ограниченные типами значений).

Первый просто превращает Func<T, R> в Func<T, Maybe<R>> и вызывает второй метод.

Основная идея заключается в том, что при запросе источника кэш проверяется на предмет каждого значения, чтобы определить, существует ли результат и возвращается ли он немедленно. Однако, если какой-либо результат отсутствует, тогда и только тогда вызывается функция выборки, передавая Subject<T>, и теперь все пропуски кэша передаются через функцию выборки. Код вызова отвечает за добавление результатов в кэш. Код асинхронно обрабатывает все значения через функцию выборки и собирает результаты вместе с кэшированными результатами в правильном порядке.

Работает как удовольствие. : -)

Вот код:

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class
{
    return source
        .FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
{
    var results = new Subject<R>();

    var disposables = new CompositeDisposable();

    var loop = new EventLoopScheduler();
    disposables.Add(loop);

    var sourceDone = false;
    var pairsDone = true;
    var exception = (Exception)null;

    var fetchIn = new Subject<T>();
    var fetchOut = (IObservable<R>)null;
    var pairs = (IObservable<KeyValuePair<int, R>>)null;

    var lookup = new Dictionary<T, int>();
    var list = new List<Maybe<R>>();
    var cursor = 0;

    Action checkCleanup = () =>
    {
        if (sourceDone && pairsDone)
        {
            if (exception == null)
            {
                results.OnCompleted();
            }
            else
            {
                results.OnError(exception);
            }
            loop.Schedule(() => disposables.Dispose());
        }
    };

    Action dequeue = () =>
    {
        while (cursor != list.Count)
        {
            var mr = list[cursor];
            if (mr.HasValue)
            {
                results.OnNext(mr.Value);
                cursor++;
            }
            else
            {
                break;
            }
        }
    };

    Action<KeyValuePair<int, R>> nextPairs = kvp =>
    {
        list[kvp.Key] = Maybe<R>.Something(kvp.Value);
        dequeue();
    };

    Action<Exception> errorPairs = ex =>
    {
        fetchIn.OnCompleted();
        pairsDone = true;
        exception = ex;
        checkCleanup();
    };

    Action completedPairs = () =>
    {
        pairsDone = true;
        checkCleanup();
    };

    Action<T> sourceNext = t =>
    {
        var mr = cache(t);
        list.Add(mr);
        if (mr.IsNothing)
        {
            lookup[t] = list.Count - 1;
            if (fetchOut == null)
            {
                pairsDone = false;
                fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                pairs = fetchIn
                    .Select(x => lookup[x])
                    .Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                disposables.Add(pairs
                    .ObserveOn(loop)
                    .Subscribe(nextPairs, errorPairs, completedPairs));
            }
            fetchIn.OnNext(t);
        }
        else
        {
            dequeue();
        }
    };

    Action<Exception> errorSource = ex =>
    {
        sourceDone = true;
        exception = ex;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    Action completedSource = () =>
    {
        sourceDone = true;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    disposables.Add(source
        .ObserveOn(loop)
        .Subscribe(sourceNext, errorSource, completedSource));

    return results.ObserveOn(scheduler);
}
0 голосов
/ 21 апреля 2011

Примерно так (я предположил, что вы хотите создать экземпляр хранилища только один раз для всех подписчиков):

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return Observable
            .CreateWithDisposable<INode>(observer =>
                ids.Subscribe(x =>
                {
                    INode node;
                    if (NodeCache.TryGetValue(x, out node))
                        observer.OnNext(node);
                    else
                    {
                        node = _lazyStorage.Value.Fetch(x);
                        NodeCache[x] = node;
                        observer.OnNext(node);
                    }
                }, observer.OnError, observer.OnCompleted));
    }
}

РЕДАКТИРОВАТЬ: Хм, это сохранение порядка при IStorage. Выбор асинхронный интересен - ожидание IStorage.Извлечение должно блокировать все будущие значения ... Мышление ...

Думаю, я понял ... Может быть ... Если вам нужно сохранить порядок, вам нужна очередь.В мире RX очередь - .Concat.Будет ли работать ниже для вас?

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    private IObservable<INode> Fetcher(Guid id)
    {
        return Observable.Defer(() =>
        {
            INode node;
            return NodeCache.TryGetValue(id, out node)
                ? Observable.Return(node)
                : _lazyStorage.Value.Fetch(id).Do(x => NodeCache[id] = x);
        });
    }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return ids.Select(Fetcher).Concat();
    }
}
...