У нас есть ситуация, когда мы хотим запустить фоновую операцию опроса в приложении C #, которое периодически возвращает значения с использованием реактивных расширений. Процесс, который мы хотели бы реализовать, следующий:
- Вызывающая сторона вызывает метод, подобный
Poll()
, который возвращает IObservable
- Вызывающий абонент подписывается на указанное наблюдаемое, и он запускает фоновый поток / задачу, которая взаимодействует с оборудованием для получения значений в некоторый интервал
- Когда вызывающий абонент завершает свою работу, он удаляет подписку и автоматически останавливает фоновый поток / задачу
Попытка # 1
Чтобы попытаться доказать это, я написал следующее консольное приложение, но это не работает так, как я ожидал:
public class OutputParameters
{
public Guid Id { get; set; }
public int Value { get; set; }
}
public class Program
{
static void Main(string[] args)
{
Console.WriteLine("Requesting the polling operation");
var worker1 = Poll();
Console.WriteLine("Subscribing to start the polling operation");
var sub1 = worker1.Subscribe(
value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
() => { Console.WriteLine("Thread has completed"); });
Thread.Sleep(5000);
sub1.Dispose();
Console.ReadLine();
}
private static IObservable<OutputParameters> Poll()
{
return Observable.DeferAsync(Worker);
}
private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
Task.Run(async () =>
{
var id = Guid.NewGuid();
const int steps = 10;
try
{
for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
{
Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
subject.OnNext(new OutputParameters { Id = id, Value = i });
// This will actually throw an exception if it's the active call when
// the token is cancelled.
//
await Task.Delay(1000, token);
}
}
catch (TaskCanceledException ex)
{
// Interestingly, if this is triggered because the caller unsibscribed then
// this is unneeded...the caller isn't listening for this error anymore
//
subject.OnError(ex);
}
if (token.IsCancellationRequested)
{
Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
}
else
{
Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
subject.OnCompleted();
}
}, token);
return Task.FromResult(subject.AsObservable());
}
}
Код выше, кажется, фактически отменяет фоновую задачу почти сразу, так как это вывод:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled
Попытка # 2
Затем я попытался внести небольшое изменение в метод Worker
, чтобы сделать его асинхронным, и ожидать вызова Task.Run
следующим образом:
private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
await Task.Run(async () =>
{
...what happens in here is unchanged...
}, token);
return subject.AsObservable();
}
В результате здесь создается впечатление, что фоновая задача имеет полный контроль, потому что она выполняется в течение примерно 5 секунд перед отменой, но не выводится из обратных вызовов подписки. Вот полный вывод:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled
Мой вопрос
Так что ясно, что я не до конца понимаю, что здесь происходит, или что использование DeferAsync
является правильным методом создания наблюдаемого в этом случае.
Есть ли правильный способ для реализации такого подхода?