Реактивные расширения против FileSystemWatcher - PullRequest
17 голосов
/ 20 апреля 2010

Одна из вещей, которые меня долго беспокоили в FileSystemWatcher - это то, как он запускает несколько событий для одного логического изменения файла. Я знаю, почему это происходит, но я не хочу беспокоиться - я просто хочу повторить анализ файла один раз, а не 4-6 раз подряд. В идеале, должно происходить событие, которое срабатывает только тогда, когда заданный файл изменяется, а не каждый шаг на этом пути.

На протяжении многих лет я придумывал различные решения этой проблемы, в разной степени безобразия. Я думал, что Reactive Extensions будет окончательным решением, но есть кое-что, что я не делаю правильно, и я надеюсь, что кто-то может указать на мою ошибку.

У меня есть метод расширения:

public static IObservable<IEvent<FileSystemEventArgs>> GetChanged(this FileSystemWatcher that)
{
    return Observable.FromEvent<FileSystemEventArgs>(that, "Changed");
}

В конечном счете, я хотел бы получить одно событие на имя файла в течение определенного периода времени, чтобы четыре события подряд с одним именем файла были сведены к одному событию, но я ничего не теряю, если несколько файлов изменены в то же время. BufferWithTime звучит как идеальное решение.

var bufferedChange = watcher.GetChanged()
    .Select(e => e.EventArgs.FullPath)
    .BufferWithTime(TimeSpan.FromSeconds(1))
    .Where(e => e.Count > 0)
    .Select(e => e.Distinct());

Когда я подписываюсь на это наблюдаемое, одно изменение отслеживаемого файла запускает мой метод подписки четыре раза подряд, что, скорее, противоречит цели. Если я удаляю вызов Distinct(), я вижу, что каждый из четырех вызовов содержит два одинаковых события - поэтому происходит некоторая буферизация. Увеличение TimeSpan, переданного до BufferWithTime, похоже, не имеет никакого эффекта - я поднялся до 20 секунд без каких-либо изменений в поведении.

Это мой первый набег в Rx, поэтому я, вероятно, упускаю что-то очевидное. Я делаю это неправильно? Есть ли лучший подход? Спасибо за любые предложения ...

Ответы [ 4 ]

9 голосов
/ 09 января 2012

Просто чтобы разогреть старую тему, так как сейчас я тоже над этим работаю:

Конечно, эта тема незначительна в контексте просмотра одного файла, поскольку FileSystemWatcher запускается каждые ~ 3 секунды с событием Changed для одного файла, когда вы отслеживаете размер с помощью

_fileSystemWatcher.NotifyFilter = NotifyFilters.Size | ....

Но давайте предположим, что FileSystemWatcher будет запускать много событий подряд (возможно, многие файлы будут изменены / переименованы / созданы), и другие люди читают это:

Вы не хотите использовать Throttle или BufferWithTime в этом случае: Throttle немного вводит в заблуждение .. он запрещает любое срабатывание до тех пор, пока не истечет время TimeSpan без события. Значение: оно никогда не может срабатывать, когда вы используете что-то вроде Throttle(TimeSpan.FromMilliseconds(200)), и после каждого события возникает пауза <200 мс. Так что это не совсем то, чего ожидают люди. Это хорошо для ввода пользователем, когда вы хотите подождать, пока пользователь не перестанет что-то печатать. Это плохо для регулирования нагрузки. </p>

BufferWithTime тоже не то, что вы хотите: оно просто заполняет буфер времени. Хорошо, когда у вас высокая начальная загрузка для каждого события, например, открытие соединения с веб-сервисом. В этом случае вы бы хотели обрабатывать события процесса каждые «временные» секунды. Но не при балансировке нагрузки, так как количество событий не меняется.

Решением является метод Sample(TimeSpan time): он принимает последнее событие в интервале времени, который является «настоящим» газом. Я думаю, что ребята из Rx действительно испортили название в этом случае.

4 голосов
/ 12 октября 2012

вы могли бы использовать group by для агрегирования событий файловой системы по имени файла и использовать полученную в результате наблюдаемую с помощью метода расширений Throttle. Я написал небольшой пример с использованием целых чисел, но основная идея та же.

var obs = from n in Enumerable.Range(1, 40).ToObservable()
    group n by n / 10 into g
    select new { g.Key, Obs = g.Throttle(TimeSpan.FromMilliseconds(10.0)) } into h
    from x in h.Obs
    select x;
obs.Subscribe(x => Console.WriteLine(x));

выходы:

9 
19 
29 
39 
40 

, что для каждой группы (n/10) - последнее наблюдаемое целое число.

3 голосов
/ 22 апреля 2010

BufferWithTime.Where (). Select (...) выполнит работу, но вы действительно хотите Throttle()

3 голосов
/ 21 апреля 2010

Моя ошибка. Каким-то образом у меня есть несколько FileSystemWatchers, контролирующих папки друг друга. Наблюдаемое срабатывало один раз для каждого наблюдателя, но, похоже, BufferWithTime работает правильно. Мне все еще нужно выяснить, почему мои наблюдатели запускают события для папок, которые, как мне показалось, были настроены на игнорирование, но это не имеет ничего общего с Rx или этим вопросом.

На самом деле, возможно, я смогу решить эту проблему и переключиться на использование одного наблюдателя для мониторинга родительской папки, используя Rx для фильтрации событий из папок, которые мне не интересны.

...