Почему Subject <T>.HasObservers не гарантируется как true после SubscribeOn ()? - PullRequest
0 голосов
/ 16 февраля 2019

Subject.HasObservers не является истинным в примере кода, прикрепленного к неопределенному количеству тиков.Если я уберу SubscribeOn (), HasObservers всегда будет true, поэтому я знаю, что это связано с инициализацией IScheduler.

Это вызывало проблему в нашем производственном программном обеспечении, когда первые несколько вызовов OnNext () ни к чему не привели, несмотря на гарантию, что переменная подписки IDisposable была инициализирована до того, как потоку, вызвавшему OnNext (), было разрешено продолжить,Это ошибка в RX?

Каковы другие способы использования классов System.Reactive, чтобы гарантировать настройку подписки с планировщиком без опроса?

Я пробовал Subject.Synchronize (),но это не имело значения.

static void Main(string[] args)
{

    for (int i = 0; i < 100; i++)
    {
        var source = new Subject<long>();

        IDisposable subscription = source
            .SubscribeOn(ThreadPoolScheduler.Instance)
            .Subscribe(Console.WriteLine);

        // 0 and 668,000 ticks for subscription setup, but rarely 0.
        int iterations = 0;
        while (!source.HasObservers)
        {
            iterations++;
            Thread.SpinWait(1);
        }

        // Next line would rarely output to Console without while loop
        source.OnNext(iterations);
        subscription.Dispose();
        source.Dispose();
    }

}

Я ожидал, что Subject.HasObservers будет истинным без опроса.

Ответы [ 2 ]

0 голосов
/ 17 февраля 2019

Решение, которое я разработал на данный момент, и надеюсь, что кто-то может улучшить его:

public class SubscribedSubject<T> : ISubject<T>, IDisposable
{
    private readonly Subject<T> _subject = new Subject<T>();

    private readonly ManualResetEventSlim _subscribed = new ManualResetEventSlim();

    public bool HasObservers => _subject.HasObservers;

    public void Dispose() => _subject.Dispose();

    public void OnCompleted() => Wait().OnCompleted();

    public void OnError(Exception error) => Wait().OnError(error);

    public void OnNext(T value) => Wait().OnNext(value);

    public IDisposable Subscribe(IObserver<T> observer)
    {
        IDisposable disposable = _subject.Subscribe(observer);
        _subscribed.Set();
        return disposable;
    }

    private Subject<T> Wait()
    {
        _subscribed.Wait();
        return _subject;
    }
}

Пример использования:

using (var source = new SubscribedSubject<long>())
{
    using (source
        .SubscribeOn(ThreadPoolScheduler.Instance)
        .Subscribe(Console.WriteLine))
    {
        source.OnNext(42);
        Console.ReadKey();
    }
}
0 голосов
/ 16 февраля 2019

Как я понимаю, проблема в том, что ваша подписка выполнена асинхронно : вызов не заблокирован, поэтому реальная подписка будет сделана позже в другом потоке.

Я не сделалНе могу найти точный способ узнать, действительно ли подписка была получена (это может быть вообще невозможно).Если ваша проблема заключается в состязании между первой OnNext и подпиской, то, возможно, вам нужно преобразовать вашу Наблюдаемую в Наблюдаемую подключаемость, используя Replay() + Connect().Таким образом, вы гарантируете, что каждый подписчик получит точно такую ​​же последовательность.

using (var source = new Subject<long>())
{
    var connectableSource = source.Replay();
    connectableSource.Connect();
    using (var subscription = connectableSource
                    .SubscribeOn(ThreadPoolScheduler.Instance)
                    .Subscribe(Console.WriteLine))
    {
        source.OnNext(42); // outputs 42 always
        Console.ReadKey(false);
    }
}

В моем коде мне все еще нужно Console.ReadKey из-за гонки между подпиской, выполненной в другом потоке, и отменой подписки.

...