.NET Rx - размер буфера ReplaySubject не работает - PullRequest
0 голосов
/ 31 января 2012

Я использую .NET Reactive Extensions для наблюдения за событиями в журнале по мере их поступления. В настоящее время я использую класс, производный от IObservable и использующий ReplaySubject для хранения журналов, таким образом я могу фильтровать и воспроизводить журналы.(например: Показать все журналы ошибок или все подробные журналы), не теряя буферизованные мной журналы.

Проблема в том, что я установил размер буфера для объекта:

this.subject = new ReplaySubject<LogEvent>(10);

Использование памяти моей программой выходит за рамки, когда я использую OnNext для добавления в наблюдаемую коллекцию в бесконечном цикле:

internal void WatchForNewEvents()
        {
            Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        dynamic parameters = new ExpandoObject();
                        // TODO: Add parameters for getting specific log events

                        if (this.logEventRepository.GetManyHasNewResults(parameters))
                        {
                            foreach (var recentEvent in this.logEventRepository.GetMany(parameters))
                            {
                                this.subject.OnNext(recentEvent);
                            }
                        }

                        // Commented this out for now to really see the memory go up 
                        // Thread.Sleep(1000); 
                    }
                });
        }

Имеет ли размер буфера на ReplaySubjectне работа?Кажется, он не очищает буфер при достижении размера буфера.Любая помощь высоко ценится!

ОБНОВЛЕНИЕ:

Я добавляю подписчиков, как это (Это неправильно?):

public IDisposable Subscribe(IObserver<LogEvent> observer)
        {
            return this.subject.Subscribe(observer);
        }

... которая называется как:

// Inserts into UI ListView
    this.logEventObservable.Subscribe(evt => this.InsertNewLogEvent(evt));

1 Ответ

0 голосов
/ 31 января 2012

Я не уверен, что это окончательный ответ, но я подозреваю, что вы столкнулись с проблемой из-за параллелизма вокруг используемого вами планировщика. Конструктор, который вы вызываете ReplaySubject, выглядит следующим образом:

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, Scheduler.CurrentThread)
{ }

Scheduler.CurrentThread беспокоит меня. Попробуйте изменить его на Scheduler.ThreadPool и посмотрите, поможет ли это.

Также, как примечание, вы, похоже, смешиваете Rx с TPL и спящим старомодным потоком. Обычно лучше этого не делать. Вы можете изменить свой WatchForNewEvents код, чтобы он выглядел так:

dynamic parameters = new ExpandoObject();

var newEvents =
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0))
    where this.logEventRepository.GetManyHasNewResults(parameters)
    from recentEvent in
        this.logEventRepository.GetMany(parameters).ToObservable()
    select recentEvent;

newEvents.Subscribe(this.subject);

Это хороший компактный Rx-y способ ведения дел.

...