Допустимая настройка для Реактивного опроса данных в C # - PullRequest
0 голосов
/ 27 декабря 2018

Мне нужно реализовать периодический опрос некоторых данных из удаленного компонента.
Он должен опрашивать данные в следующий раз после того, как будет выполнен предыдущий опрос + подождите, скажем, 5сек .
Я придумалкод ниже, но позже выяснилось, что приложение выдает исключение при закрытии - в программе просмотра событий есть запись для 0xc00000fd, которая является исключением переполнения стека.
Стоит отметить, что это исключение происходит только тогда, когда приложение открыто и опрашивает данные в течение достаточно долгого времени.(для переполнения стека требуется время).
Все это приложение WPF, код которого приведен ниже ViewModel.

Я понимаю, почему в этом коде происходит исключение (вероятно, не следует вызывать OnNext в Subscribe), но как правильно реализовать?

_ctrlSubj = new Subject<ControllerInfo>();
_ctrlSubj.SelectMany(async _ => 
{
    // CurrentController is of type ControllerInfo
    // next line can take various amount of time
    var jobDetails = await Library.GetJobsAsync(CurrentController);
    return jobDetails;
})
.ObserveOnDispatcher()
.Subscribe(async e =>
{
    // Jobs is bound to View
    Jobs = new ObservableCollection<JobDetail>(jobDetails);

    await Task.Delay(TimeSpan.FromSeconds(5));
    _ctrlSubj.OnNext(CurrentController);
});

Ответы [ 2 ]

0 голосов
/ 28 декабря 2018

Извините @Shlomo, самый простой способ добиться этого заключается в следующем (это даст вам задержку при запуске до 5 секунд).

var jobs = Observable.Create<List<Job>>(async (observer, cancel) => {
   while(cancel.IsCancellationRequested == false)
   {
      try
      {
         var ret = await Library.GetJobsAsync(CurrentController);
         observer.OnNext(ret);
         await Task.Delay(5000, cancel);
      }
      catch(Exception ex)
      {
         observer.OnError(ex);
         return;
      }
   }
   observer.OnCompleted();

});

Если вместо этого вы хотите задержку при запуске и при запускеза 5 секунд вы можете обменять тело кода просто:

var delay = Task.Delay(5000, cancel);
var ret = await Library.GetJobsAsync(CurrentController);
observer.OnNext(ret);
await delay;

Вот код, переписанный в LinqPad "UnitTest" ... Вы можете подтвердить результаты.

void Main()
{
    var scheduler = new TestScheduler();

    var foo = Observable.Create<int>(async (observer, cancellationToken) => {
        while(!cancellationToken.IsCancellationRequested){
            var ret = await DoStuff(scheduler);
            observer.OnNext(ret);
            await Observable.Delay(
                Observable.Return(Unit.Default), 
                TimeSpan.FromSeconds(5), 
                scheduler)
            .ToTask();
        }
    });

    using(foo.Timestamp(scheduler).Subscribe(f => Console.WriteLine(f.Timestamp))){
        scheduler.AdvanceBy(TimeSpan.FromSeconds(120).Ticks);
    }

}

// Define other methods and classes here


public Task<int> DoStuff(IScheduler scheduler){
    return Observable.Delay(Observable.Return(1), TimeSpan.FromSeconds(1), scheduler).ToTask();
}

Вывод следующий:

01/01/0001 00:00:01 +00:00
01/01/0001 00:00:07 +00:00
01/01/0001 00:00:13 +00:00
01/01/0001 00:00:19 +00:00
01/01/0001 00:00:25 +00:00
01/01/0001 00:00:31 +00:00
01/01/0001 00:00:37 +00:00
01/01/0001 00:00:43 +00:00
01/01/0001 00:00:49 +00:00
01/01/0001 00:00:55 +00:00
01/01/0001 00:01:01 +00:00
01/01/0001 00:01:07 +00:00
01/01/0001 00:01:13 +00:00
01/01/0001 00:01:19 +00:00
01/01/0001 00:01:25 +00:00
01/01/0001 00:01:31 +00:00
01/01/0001 00:01:37 +00:00
01/01/0001 00:01:43 +00:00
01/01/0001 00:01:49 +00:00
01/01/0001 00:01:55 +00:00
0 голосов
/ 27 декабря 2018

2-е редактирование :

@ Ответ Арона напомнил мне о проблемах с несколькими подписками, которые есть у ответа ниже.Я рекомендую вспомогательную функцию IntervalAsync, которая выглядит следующим образом:

public static class RxExtensions
{
    public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period)
    {
        return IntervalAsync(f, period, Scheduler.Default);
    }

    public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period, IScheduler scheduler)
    {
        return Observable.Create<TResult>(o =>
        {
            var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
            var observable = q
                .Delay(t => Observable.Timer(t))
                .SelectMany(_ => f())
                .Do(t => q.OnNext(period));
            return observable.Subscribe(o);
        });
    }
}

, а окончательный код выглядит следующим образом:

var subscription = RxExtensions.IntervalAsync(
        () => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController)), 
        TimeSpan.FromSeconds(5)
    )
    .ObserveOnDispatcher()
    .Subscribe(i =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

@ Ответ Арона работает.Я не нахожу это проще, потому что у вас больше микширования Rx-TPL, хотя я признаю, что «проще» в глазах смотрящего.


Первое редактирование : ( Не рекомендуется , имеется несколько ошибок подписки).

Ваша проблема действительна: с синхронным реактивнымpipe, Interval будет ждать завершения всего конвейера.Но с асинхронным конвейером Interval не выдержит.Таким образом, если первое асинхронное задание заняло 4,5 секунды, следующее задание начнется через 0,5 секунды после завершения первого.

Если вы хотите, чтобы задержка конца-начала была заданным временным интервалом, я думаю, что было бы лучше создать механизм очереди, аналогичный тому, который вы настроили.Я хотел бы сделать что-то похожее на это:

var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var subscription = q
    .Delay(t => Observable.Timer(t))
    .SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
    .Subscribe(i =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

Управление потоками и стеком Rx. Я думаю, что здесь работает лучше, чем TPL, и это не приведет к бесконечному стеку.Тем не менее, я не проверял.


Оригинальный ответ:

Это может сделать это, но я не могу проверить, потому что нет типов.

var subscription = Observable.Interval(TimeSpan.FromSeconds(5))
    .SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
    .ObserveOnDispatcher()
    .Subscribe(jobDetails =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

Если ничего не получится, измените ваш ответ, включив mcve .


Код тестирования:

Это код, который я использую для проверки асинхронного микширования TPL / RX.Не полностью копирует среду @ IgorStack, потому что нет WPF (нет .ObserveOnDispatcher()):

var f = new Func<Task<int>>(async () => {
    await Task.Delay(TimeSpan.FromSeconds(1));
    return 3;
});

var scheduler = new EventLoopScheduler(); //or Scheduler.Default
var o1 = RxExtensions.IntervalAsync(() => Observable.FromAsync(() => f()), TimeSpan.FromSeconds(5), scheduler)
    .Timestamp();
var subscription1 = o1.Subscribe(i =>
    {
        Console.WriteLine("s1: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
    });
var subscription2 = o1.Subscribe(i =>
    {
        Console.WriteLine("s2: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
    });
...