ReactiveUI: как реализовать периодически обновляемый ReactiveList - PullRequest
0 голосов
/ 03 мая 2018

Я новичок в мире реактивных расширений и все еще пытаюсь учиться.

Я занимаюсь разработкой приложения с сеткой данных, которая отображает некоторые запущенные процессы Windows и их использование памяти. Использование памяти каждым процессом должно часто обновляться, т. Е. Каждые 200 мс.

Требования

  • Когда флажок установлен
    • сетка данных должна быть заполнена процессами, а использование памяти обновляется с использованием таймера с интервалом 200 мс.
    • монитор (все должно быть сделано в фоновом потоке)

- если процесс завершился, его следует удалить из источника.

- если процесс запускается, его следует добавить к источнику

- файл для изменений

  • Когда флажок снят
    • вся деятельность монитора должна быть остановлена ​​
    • сетка данных очищена

Любая помощь будет принята с благодарностью! Примечания:

  • В прошлом я пробовал несколько подходов, таких как использование ObservableConcurrentDictionary в качестве ресурса и таймера для периодического обновления ресурса, но я столкнулся с проблемами (параллелизм, блокировка и т. Д.), Поэтому я хотел бы есть решение на основе Rx / ReactiveUI
  • Из-за технических ограничений я могу использовать только .NET Framework 4.0, Reactive-core.Net40

Обновление

ViewModel

private ReactiveList<IProcessModel> _processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
public ReactiveList<IProcessModel> Processes { get { return _processes; } }

public MainViewModel(IMonitorService monitorService)
{
   this.WhenAnyValue(vm => vm.ShowProcessesIsChecked).Subscribe((b) => DoShowProcesses(b));
}


private void DoShowProcesses(bool checkboxChecked)
{
    IDisposable timer;
    Processes.Clear();
    if (checkboxChecked)
    {
        //checkbox checked
        lock (Processes)
            Processes.AddRange(_monitorService.GetProcesses());
        timer = Observable.Timer(TimeSpan.FromMilliseconds(200.0))
            .Select(x =>
        {
            lock (Processes)
            {
                foreach (var process in Processes) //throws the 'Collection was modified; enumeration operation may not execute.'
                    process.UpdateMemory(); 

                return Processes.Where(p => p.ProcessObject.HasExited).ToList();
            }
        }).
        ObserveOnDispatcher()
        .Subscribe(processesExited =>
        {
            if (processesExited.Count() > 0)
            {
                lock (Processes)
                    Processes.RemoveAll(processesExited); //remove all processes that have exited
            }

        });
    }
    else
    {
        if (timer != null)
            timer.Dispose();
    }
}

Я создал новую тему

Оригинал

ViewModel

public class MainViewModel : ReactiveObject
{
    public ReactiveList<IProcessModel> Processes { get; private set; }
    IMonitorService _monitorService;

    public MainViewModel(IMonitorService monitorService)
    {
        _monitorService = monitorService;

        Processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == true) //checkbox checked
            .ObserveOn(Scheduler.Default) //raise notifications on thread-pool thread to keep UI responsive
            .Select((isChecked) =>
            {
                return monitorService.GetProcesses();
            })
            .ObserveOn(SynchronizationContext.Current)
            .Subscribe(processes => {
                Processes.AddRange(processes); }
            );
        //start the MonitorService with MonitorService.Start(Processes)
        //start a timer with an interval of 200ms --> at interval
        //- do UpdateMemory() foreach IProcessModel in Processes
        //- if ProcessObject.HasExited --> remove it from the collection source
        ;
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == false) //checkbox unchecked
            .Subscribe((isChecked) =>
            {
                monitorService.Stop(); //this stops monitoring for starting processes and clears the Processes
            });
    }

    private bool _showProcessesIsChecked;

    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}

Модель

public class ProcessModel : ProcessModelBase, IProcessModel
{

    public ProcessModel(Process process)
    {
        ProcessObject = process;
    }      

    public void UpdateMemory()
    {
        try
        {
            if (!ProcessObject.HasExited)
            {
                long mem = ProcessObject.PagedMemorySize64;
                ProcessObject.Refresh();
                if (mem != ProcessObject.PagedMemorySize64)
                    OnPropertyChanged(nameof(ProcessObject));
            }
        }
        catch (Exception)
        {
            //log it
        }
    }
}

Услуги

public class MonitorService : IMonitorService
{
    ManagementEventWatcher managementEventWatcher;
    ReactiveList<IProcessModel> _processes;

    public List<IProcessModel> GetProcesses()
    {
        List<IProcessModel> processes = new List<IProcessModel>();

        foreach (var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }

    /// <summary>
    /// Starts the manager. Monitor a starting process and changes in log file
    /// </summary>
    /// <param name="processes"></param>
    public void Start(ReactiveList<IProcessModel> processes)
    {
        _processes = processes;

        var qStart = "SELECT * FROM Win32_ProcessStartTrace WHERE ProcessName like 'chrome'";
        managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        managementEventWatcher.EventArrived += new EventArrivedEventHandler(OnProcessStarted);
        try
        {
            managementEventWatcher.Start();
        }
        catch (Exception)
        {
            //log it
        }
        Task.Factory.StartNew(() => MonitorLogFile());
    }


    public void Stop()
    {
        if (managementEventWatcher != null)
            managementEventWatcher.Stop();
        if (_processes != null)
            _processes.Clear();
    }

    private void MonitorLogFile()
    {
        //this code monitors a log file for changes. It is possible that the IsChecked property of a ProcessModel object is set in the Processes collection
    }


    private void OnProcessStarted(object sender, EventArrivedEventArgs e)
    {

        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            _processes.Add(new ProcessModel(process));
        }
        catch (ArgumentException)
        {
            //log it
        }
        catch (InvalidOperationException)
        {
            //log it
        }

    }
}

XAML

<CheckBox Content='Show Processes' IsChecked='{Binding ShowProcessesIsChecked}' />
<DataGrid  ItemsSource="{Binding Processes}">
    <DataGrid.Resources>
      <DataGridTemplateColumn Header='Process'
                              x:Key='dgProcessName'
                              IsReadOnly='True'
                              x:Shared='False'>
        <DataGridTemplateColumn.CellTemplate>
            <DataTemplate>
                <StackPanel Orientation='Horizontal' VerticalAlignment='Center'>
                    <CheckBox IsChecked='{Binding IsChecked, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}' HorizontalAlignment='Stretch' VerticalAlignment='Stretch'> </CheckBox>
                    <TextBlock Text='{Binding ProcessObject.ProcessName}' />
                </StackPanel>
            </DataTemplate>
        </DataGridTemplateColumn.CellTemplate>
          </DataGridTemplateColumn>
          <DataGridTextColumn Header="PID"
                              Binding="{Binding ProcessObject.Id}"
                              IsReadOnly='True'
                              x:Key='dgPID'
                              x:Shared='False' />
          <DataGridTextColumn Header="Commit Size"
                              Binding='{Binding ProcessObject.PagedMemorySize64}'
                              IsReadOnly='True'
                              x:Key='dgCommitSize'
                              x:Shared='False' />
    </DataGrid.Resources>
</DataGrid>

Ответы [ 2 ]

0 голосов
/ 04 мая 2018

Я все равно хотел практиковать свои навыки Rx, поэтому я решил создать проект WPF и дать ему шанс. Я получил это работает, поэтому я поделюсь, как я поступил об этом.

  • Удалить список процессов из MonitorService. Это поможет изолировать источник изменений списка, упрощая нашу жизнь при отладке. Это также сужает обязанности MonitorService до предоставления первоначального списка и внесения изменений.

  • Мы уже «реагируем», поэтому мы можем также превратить событие EventArrived в наблюдаемое, используя FromEventPattern . Затем мы можем преобразовать эти запущенные события в ProcessModels и отправить их подписчику (ам).

  • Я переместил создание ManagementEventWatcher в конструктор, поэтому нам не нужно его заново создавать каждый раз, когда флажок установлен. Теперь методы Start / Stop являются просто оболочками для версий _managementEventWatcher.

.

public class MonitorService
{
    ManagementEventWatcher _managementEventWatcher;

    public IObservable<ProcessModel> NewProcessObservable { get; }

    public MonitorService()
    {
        var qStart = "SELECT * FROM Win32_ProcessStartTrace where ProcessName='chrome.exe'";
        _managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        var eventArrivedObservable = Observable.FromEventPattern<EventArrivedEventHandler, EventArrivedEventArgs>(
            x => _managementEventWatcher.EventArrived += x,
            x => _managementEventWatcher.EventArrived -= x);

        NewProcessObservable = eventArrivedObservable
            .Select(x => GetProcessModel(x.EventArgs))
            .Where(x => x != null);
    }

    public List<ProcessModel> GetProcesses()
    {
        List<ProcessModel> processes = new List<ProcessModel>();

        foreach(var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }

    public void Start()
    {
        try
        {
            _managementEventWatcher.Start();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

    public void Stop()
    {
        if(_managementEventWatcher != null)
            _managementEventWatcher.Stop();
    }

    private ProcessModel GetProcessModel(EventArrivedEventArgs e)
    {
        ProcessModel model = null;
        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            model = new ProcessModel(process);
        }
        catch(ArgumentException)
        {
            //log it
        }
        catch(InvalidOperationException)
        {
            //log it
        }

        return model;
    }
}

MainViewModel

  • Во избежание ошибок, таких как 'Коллекция была изменена; операция перечисления может не выполняться '; простое решение состоит в том, чтобы выполнить итерацию в обратном направлении, используя цикл for .

  • Сразу же перед каждым вызовом подписки используйте .ObserveOnDispatcher () , чтобы вызовы onNext выполнялись в потоке пользовательского интерфейса. Крутая часть в том, что only место, где мы модифицируем список, теперь находится внутри подписки. Итак, нам не нужно использовать замки и все такое.

  • Я разделил логику на 3 разные подписки: Запуск / остановка мониторинга, Обновление использования памяти и удаление процессов, которые вышли из (использовался шаблон, предложенный @ Enigmativity), и add недавно запущенные процессы в нашем реактивном списке. Надеемся, что это облегчает следование логике.

.

public class MainViewModel : ReactiveObject
{
    public ReactiveList<ProcessModel> Processes { get; private set; }
    MonitorService _monitorService;

    public MainViewModel(MonitorService monitorService)
    {
        _monitorService = monitorService;

        Processes = new ReactiveList<ProcessModel>() { ChangeTrackingEnabled = true };
        RxApp.SupportsRangeNotifications = false;

        IObservable<bool> checkboxObservable = this.WhenAnyValue(vm => vm.ShowProcessesIsChecked);
        IObservable<long> intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(200.0));

        // Start/stop the monitoring.
        checkboxObservable
            // Skip the default unchecked state.
            .Skip(1)
            .ObserveOnDispatcher()
            .Subscribe(
                isChecked =>
                {
                    if(isChecked)
                    {
                        Processes.AddRange(monitorService.GetProcesses());
                        monitorService.Start();
                    }
                    else
                    {
                        Processes.Clear();
                        monitorService.Stop();
                    }
                });

        // Update the memory usage and remove processes that have exited.
        checkboxObservable
            .Select(isChecked => isChecked ? intervalObservable : Observable.Never<long>())
            // Switch disposes of the previous internal observable
            // (either intervalObservable or Never) and "switches" to the new one.
            .Switch()
            .ObserveOnDispatcher()
            .Subscribe(
                _ =>
                {
                    // Loop backwards in a normal for-loop to avoid the modification errors.
                    for(int i = Processes.Count - 1; i >= 0; --i)
                    {
                        if(Processes[i].ProcessObject.HasExited)
                        {
                            Processes.RemoveAt(i);
                        }
                        else
                        {
                            Processes[i].UpdateMemory();
                        }
                    }
                });

        // Add newly started processes to our reactive list.
        monitorService.NewProcessObservable
            .Where(_ => ShowProcessesIsChecked)
            .ObserveOnDispatcher()
            .Subscribe(process => Processes.Add(process));
    }

    private bool _showProcessesIsChecked;

    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}

Подробнее Если это единственная страница / окно в вашем приложении, то этого достаточно. Если нет, у вас есть немного больше работы, чтобы избежать утечек памяти. В последнем случае я рекомендую Google выполнить поиск "ReactiveUI WhenActivation" (и, возможно, даже добавить туда wpf). Вы найдете много примеров, которые вы можете использовать и учиться.

0 голосов
/ 03 мая 2018

Вы в основном хотите использовать этот тип шаблона:

IObservable<bool> checkBoxChecked = /* your checkbox observable here */
IObservable<long> timer = Observable.Interval(TimeSpan.FromMilliseconds(200.0));

IObservable<long> query =
    checkBoxChecked
        .Select(x => x ? timer : Observable.Never<long>().StartWith(-1L))
        .Switch();

IDisposable subscription =
    query
        .Subscribe(n =>
        {
            if (n == -1L)
            {
                // Clear UI
            }
            else
            {
                // Update UI
            }
        });

Это переключает между запущенным и неработающим в зависимости от значения флажка.

Вам необходимо убедиться, что вы наблюдаете в потоке пользовательского интерфейса, но, кроме незначительных настроек, это должно работать нормально.

...