Создание неблокирующего наблюдаемого метода расширения, который возвращает элемент по умолчанию для пустой последовательности - PullRequest
2 голосов
/ 05 октября 2010

Представьте себе следующий оператор linq to observables:

var x = from result1 in service1.operation()
        from result2 in service2.operation()
        from result3 in service3.operation()
        select DoSomething()

x.Subscribe()

void Unit DoSomething() {
 ...
}

Все службы возвращают холодную наблюдаемую, поэтому они будут ждать завершения каждой из них, а затем вызывается DoSomething.

Теперьis service2.operation возвращает Observable.Empty, который в основном является уведомлением о неполном завершении, и это будет означать, что service3 никогда не будет вызываться и DoSomething также не будет.

Я хотел продолжить цепочку, если возвращается oncomplete, но предоставилзначение по умолчанию для результата2.Поэтому я создал метод расширения OnEmptyReturnDefault

public static IObservable<T> OnEmptyReturnDefault<T>(this IObservable<T> observable)
{
    var maybeReturnsSomething = observable.Memoize(); // Custom Lazy caching extension method
    var whenEmpty = from isEmpty in maybeReturnsSomething.IsEmpty()
                    where isEmpty
                    select default(T);
    var whenNotEmpty = from isEmpty in maybeReturnsSomething.IsEmpty()
                        where !isEmpty
                        from notEmpty in maybeReturnsSomething
                        select notEmpty;
    return whenEmpty.Merge(whenNotEmpty);
}

, позволяющий мне делать:

var x = from result1 in service1.operation()
            from result2 in service2.operation().OnEmptyReturnDefault()
            from result3 in service3.operation()
            select DoSomething()

Все хорошо, за исключением того, что мое решение блокирует.IsEmpty () по сути делает Take (1) .Count () == 0. Я хочу решение, которое не блокирует.

1 Ответ

1 голос
/ 08 октября 2010

Решение, предложенное на другом сайте, работает хорошо:

public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> src, T defaultValue)
{
  return src
    .Materialize()
    .Select((n, i) => (n.Kind == NotificationKind.OnCompleted && i == 0)
                ? new Notification<T>[]
                {
                  new Notification<T>.OnNext(defaultValue), 
                  new Notification<T>.OnCompleted()
                }
                : new[] {n})
    .SelectMany(ns => ns)
    .Dematerialize();
}
...