Как запустить холодную наблюдаемую с помощью Task.Run () из параллельной библиотеки задач? - PullRequest
0 голосов
/ 26 июня 2018

У нас есть ситуация, когда мы хотим запустить фоновую операцию опроса в приложении C #, которое периодически возвращает значения с использованием реактивных расширений. Процесс, который мы хотели бы реализовать, следующий:

  1. Вызывающая сторона вызывает метод, подобный Poll(), который возвращает IObservable
  2. Вызывающий абонент подписывается на указанное наблюдаемое, и он запускает фоновый поток / задачу, которая взаимодействует с оборудованием для получения значений в некоторый интервал
  3. Когда вызывающий абонент завершает свою работу, он удаляет подписку и автоматически останавливает фоновый поток / задачу

Попытка # 1

Чтобы попытаться доказать это, я написал следующее консольное приложение, но это не работает так, как я ожидал:

public class OutputParameters
{
    public Guid Id { get; set; }
    public int Value { get; set; }
}

public class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Requesting the polling operation");
        var worker1 = Poll();

        Console.WriteLine("Subscribing to start the polling operation");

        var sub1 = worker1.Subscribe(
            value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
            ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
            () => { Console.WriteLine("Thread has completed"); });


        Thread.Sleep(5000);

        sub1.Dispose();

        Console.ReadLine();
    }


    private static IObservable<OutputParameters> Poll()
    {
        return Observable.DeferAsync(Worker);
    }


    private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        Task.Run(async () =>
        {
            var id = Guid.NewGuid();
            const int steps = 10;

            try
            {
                for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
                {
                    Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
                    subject.OnNext(new OutputParameters { Id = id, Value = i });

                    // This will actually throw an exception if it's the active call when
                    //  the token is cancelled.
                    //
                    await Task.Delay(1000, token);
                }
            }
            catch (TaskCanceledException ex)
            {
                // Interestingly, if this is triggered because the caller unsibscribed then
                //  this is unneeded...the caller isn't listening for this error anymore
                //
                subject.OnError(ex);
            }

            if (token.IsCancellationRequested)
            {
                Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
            }
            else
            {
                Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
                subject.OnCompleted();
            }
        }, token);

        return Task.FromResult(subject.AsObservable());
    }
}

Код выше, кажется, фактически отменяет фоновую задачу почти сразу, так как это вывод:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled

Попытка # 2

Затем я попытался внести небольшое изменение в метод Worker, чтобы сделать его асинхронным, и ожидать вызова Task.Run следующим образом:

    private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        await Task.Run(async () =>
        {
            ...what happens in here is unchanged...
        }, token);

        return subject.AsObservable();
    }

В результате здесь создается впечатление, что фоновая задача имеет полный контроль, потому что она выполняется в течение примерно 5 секунд перед отменой, но не выводится из обратных вызовов подписки. Вот полный вывод:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled

Мой вопрос

Так что ясно, что я не до конца понимаю, что здесь происходит, или что использование DeferAsync является правильным методом создания наблюдаемого в этом случае.

Есть ли правильный способ для реализации такого подхода?

1 Ответ

0 голосов
/ 27 июня 2018

Это будет сделано, если будет достаточно решения только для RX. Гораздо чище, если вы спросите меня ...

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
        .SelectMany(id => 
            Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
                .ObserveOn(new EventLoopScheduler())
                .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
                .Select(i => new OutputParameters { Id = id, Value = i })
        );
}

Объяснение:

  • Generate похоже на цикл for для Rx. Последний аргумент определяет, когда элементы выбрасываются. Это эквивалентно вашему циклу for + Task.Delay.
  • ObserveOn контролирует, где / когда наблюдается наблюдаемое. В этом случае EventLoopScheduler будет раскручивать один новый поток на каждого подписчика, и все элементы из этого наблюдаемого будут наблюдаться в новом потоке.

Из загадки:

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<OutputParameters>(() =>
    {
        var id = Guid.NewGuid();
        return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
                _ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
            .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
            .Select(i => new OutputParameters { Id = id, Value = i });
    });
}
...