Почему код выдает ObjectDisposedException - PullRequest
0 голосов
/ 17 мая 2018
static void Main(string[] args)
{
    Observable.Using(() => new EventLoopScheduler(), els => Observable
            .Defer(() => Observable.Return(1))
            .SubscribeOn(els))
        .Subscribe();
    Console.ReadLine();
}

Почему приведенный выше код создаст исключение ObjectDisposedException?Как именно работает SubscribeOn?


Подробнее:

//can't compile
class DataService : ObservableBase<Unit>
{
    protected override IDisposable SubscribeCore(IObserver<Unit> o) 
    {
        return Observable.Defer(() => Observable.Start(() => _httpClient.Get(...)))
            .RepeatWithDelay(TimeSpan.FromSeconds(1))
            .ObserveOn(SchedulerEx.Current)//observe back to event loop
            .Do(...)
            .Select(_ => Unit.Default)
            .Subscribe(o)
    }
}
class Controller 
{
    void Start()
    {
        _instance = Observable.Using(
            () => SchedulerEx.Create(), 
            els => _dataService.SubscribeOn(els));
    }
    void Stop()
    {
        _instance.Dispose();
    }
}
class SchedulerEx 
{
    [ThreadStatic]
    public static EventLoopScheduler Current;
    public EventLoopScheduler Create() 
    {
        var els = new EventLoopScheduler();
        els.Schedule(() => SchedulerEx.Current = els);
        return els;
    } 
}
static void Main() 
{
    var controller = kernel.Get<Controller>();
    controller.Start();
    controller.Stop();//throw if stop immediately
}

Что я хочу реализовать: в приложении службы Windows у меня есть несколько DataService, независимо выполняющихся в своем собственном цикле событий,и я могу наблюдать за текущим циклом событий в любом месте.

Ответы [ 4 ]

0 голосов
/ 22 мая 2018

Просто попробуйте это:

var els = new EventLoopScheduler();
var subscription =
    Observable
        .Return(1)
        .Finally(() => els.Schedule(() => els.Dispose()))
        .ObserveOn(els)
        .Subscribe();

Console.ReadLine();
0 голосов
/ 17 мая 2018

Это довольно крендель:

  • Observable.Using похоже на оператор C # using: он располагает IDisposable после завершения наблюдения.
  • Observable.Defer задерживает запуск включенного кода до подписки подписчика.
  • SubscribeOn указывает, на какой поток должен быть подписан наблюдаемый объект на основе переданного в IScheduler.

Я проверил следующее, которое также взрывает:

static void Main(string[] args)
{
    Observable.Using(() => new EventLoopScheduler(), els => 
        Observable.Empty<int>()
            .SubscribeOn(els)
    )
        .Subscribe();
}

... поэтому Defer не оказывает влияния.

Очевидно, Using утилизирует планировщик перед его использованием. Если вы измените Observable.Empty (или Observable.Return) на что-то, что не завершится немедленно (например, Observable.Never или Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(2)), оно не будет разбомблено.

Похоже на ошибку состояния гонки в EventLoopScheduler: Вероятно, и Observable.Using, и EventLoopScheduler пытаются вызвать Dispose, и, если туда попадет неправильный, взорвать.

Я бы порекомендовал сбросить Using.

0 голосов
/ 18 мая 2018
Observable.Using(() => new EventLoopScheduler(), 
            els => new AnonymousObservable<long>(o => els.Schedule(Unit.Default, (_, s) => source.Subscribe(o))));

Наконец-то я решил проблему с помощью кода выше.

0 голосов
/ 17 мая 2018

Уверен, из-за .Using().Внутренне это (наиболее вероятно) продолжается на using () {} конструкции, которая неявно вызывает .Dispose() в конце самого себя.

...