2-е редактирование :
@ Ответ Арона напомнил мне о проблемах с несколькими подписками, которые есть у ответа ниже.Я рекомендую вспомогательную функцию IntervalAsync
, которая выглядит следующим образом:
public static class RxExtensions
{
public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period)
{
return IntervalAsync(f, period, Scheduler.Default);
}
public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period, IScheduler scheduler)
{
return Observable.Create<TResult>(o =>
{
var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var observable = q
.Delay(t => Observable.Timer(t))
.SelectMany(_ => f())
.Do(t => q.OnNext(period));
return observable.Subscribe(o);
});
}
}
, а окончательный код выглядит следующим образом:
var subscription = RxExtensions.IntervalAsync(
() => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController)),
TimeSpan.FromSeconds(5)
)
.ObserveOnDispatcher()
.Subscribe(i =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});
@ Ответ Арона работает.Я не нахожу это проще, потому что у вас больше микширования Rx-TPL, хотя я признаю, что «проще» в глазах смотрящего.
Первое редактирование : ( Не рекомендуется , имеется несколько ошибок подписки).
Ваша проблема действительна: с синхронным реактивнымpipe, Interval
будет ждать завершения всего конвейера.Но с асинхронным конвейером Interval
не выдержит.Таким образом, если первое асинхронное задание заняло 4,5 секунды, следующее задание начнется через 0,5 секунды после завершения первого.
Если вы хотите, чтобы задержка конца-начала была заданным временным интервалом, я думаю, что было бы лучше создать механизм очереди, аналогичный тому, который вы настроили.Я хотел бы сделать что-то похожее на это:
var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var subscription = q
.Delay(t => Observable.Timer(t))
.SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
.Subscribe(i =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});
Управление потоками и стеком Rx. Я думаю, что здесь работает лучше, чем TPL, и это не приведет к бесконечному стеку.Тем не менее, я не проверял.
Оригинальный ответ:
Это может сделать это, но я не могу проверить, потому что нет типов.
var subscription = Observable.Interval(TimeSpan.FromSeconds(5))
.SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
.ObserveOnDispatcher()
.Subscribe(jobDetails =>
{
Jobs = new ObservableCollection<JobDetail>(jobDetails);
});
Если ничего не получится, измените ваш ответ, включив mcve .
Код тестирования:
Это код, который я использую для проверки асинхронного микширования TPL / RX.Не полностью копирует среду @ IgorStack, потому что нет WPF (нет .ObserveOnDispatcher()
):
var f = new Func<Task<int>>(async () => {
await Task.Delay(TimeSpan.FromSeconds(1));
return 3;
});
var scheduler = new EventLoopScheduler(); //or Scheduler.Default
var o1 = RxExtensions.IntervalAsync(() => Observable.FromAsync(() => f()), TimeSpan.FromSeconds(5), scheduler)
.Timestamp();
var subscription1 = o1.Subscribe(i =>
{
Console.WriteLine("s1: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
});
var subscription2 = o1.Subscribe(i =>
{
Console.WriteLine("s2: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
});