Потребовалось некоторое время, чтобы решить это, но я наконец-то нашел свое собственное решение.
Я определил два метода расширения с именем FromCacheOrFetch
с этими сигнатурами:
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, R> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, Maybe<R>> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
Первый использует стандартные типы CLR / Rx, а второй - монаду Maybe
(допускающие типы, не ограниченные типами значений).
Первый просто превращает Func<T, R>
в Func<T, Maybe<R>>
и вызывает второй метод.
Основная идея заключается в том, что при запросе источника кэш проверяется на предмет каждого значения, чтобы определить, существует ли результат и возвращается ли он немедленно. Однако, если какой-либо результат отсутствует, тогда и только тогда вызывается функция выборки, передавая Subject<T>
, и теперь все пропуски кэша передаются через функцию выборки. Код вызова отвечает за добавление результатов в кэш. Код асинхронно обрабатывает все значения через функцию выборки и собирает результаты вместе с кэшированными результатами в правильном порядке.
Работает как удовольствие. : -)
Вот код:
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
where R : class
{
return source
.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler)
{
var results = new Subject<R>();
var disposables = new CompositeDisposable();
var loop = new EventLoopScheduler();
disposables.Add(loop);
var sourceDone = false;
var pairsDone = true;
var exception = (Exception)null;
var fetchIn = new Subject<T>();
var fetchOut = (IObservable<R>)null;
var pairs = (IObservable<KeyValuePair<int, R>>)null;
var lookup = new Dictionary<T, int>();
var list = new List<Maybe<R>>();
var cursor = 0;
Action checkCleanup = () =>
{
if (sourceDone && pairsDone)
{
if (exception == null)
{
results.OnCompleted();
}
else
{
results.OnError(exception);
}
loop.Schedule(() => disposables.Dispose());
}
};
Action dequeue = () =>
{
while (cursor != list.Count)
{
var mr = list[cursor];
if (mr.HasValue)
{
results.OnNext(mr.Value);
cursor++;
}
else
{
break;
}
}
};
Action<KeyValuePair<int, R>> nextPairs = kvp =>
{
list[kvp.Key] = Maybe<R>.Something(kvp.Value);
dequeue();
};
Action<Exception> errorPairs = ex =>
{
fetchIn.OnCompleted();
pairsDone = true;
exception = ex;
checkCleanup();
};
Action completedPairs = () =>
{
pairsDone = true;
checkCleanup();
};
Action<T> sourceNext = t =>
{
var mr = cache(t);
list.Add(mr);
if (mr.IsNothing)
{
lookup[t] = list.Count - 1;
if (fetchOut == null)
{
pairsDone = false;
fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
pairs = fetchIn
.Select(x => lookup[x])
.Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
disposables.Add(pairs
.ObserveOn(loop)
.Subscribe(nextPairs, errorPairs, completedPairs));
}
fetchIn.OnNext(t);
}
else
{
dequeue();
}
};
Action<Exception> errorSource = ex =>
{
sourceDone = true;
exception = ex;
fetchIn.OnCompleted();
checkCleanup();
};
Action completedSource = () =>
{
sourceDone = true;
fetchIn.OnCompleted();
checkCleanup();
};
disposables.Add(source
.ObserveOn(loop)
.Subscribe(sourceNext, errorSource, completedSource));
return results.ObserveOn(scheduler);
}