Пакетное средство чтения файлового потока для отправки в RX - PullRequest
1 голос
/ 09 июля 2019

Я хочу построчно загружать содержимое файла в поток, а затем сидеть и наблюдать каждую секунду за новыми записями в файле - так что программа для чтения файлов в реальном времени передает данные в RX.

Я добился этого, читая по одной строке за раз и, если в readline () есть данные, для обратного вызова через переданное действие, которое в вызывающей стороне помещает данные в ReplaySubject для подписчиков RX.

Проблема состоит в том, что он отправляет только одну строку за раз в RX-потоке.Я хотел бы сгруппировать их, чтобы они не перезванивали, пока вы либо не скажете 10 пунктов для отправки, либо не пройдет определенное время - например, 5-10 секунд.

Мой обратный вызов - это набор данных, сейчас я жестко запрограммировал его, чтобы он возвращал только один элемент в коллекции, так как не могу понять, как выполнить пакетную обработку по времени.

Кто-нибудь может подсказать, как этого добиться?

Мой код пока

public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();

            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        //todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
                        callbackAction(new [] {new LogTailMessage(lineNumber, s)});
                        lineNumber++;
                    }
                    else
                        wh.WaitOne(1000);
                }
            }
        }

Обновлено: Буферное решение

var watcherSubject = new ReplaySubject<LogTailMessage>();

            var watcher = new LogFileWatcher(path, filename);

            new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));

            Stream = watcherSubject
                .Buffer(TimeSpan.FromMilliseconds(500), 20)
                .Where(d => d != null)
                .Replay()
                .RefCount();

И средство просмотра файлов

public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
        {
            var wh = new AutoResetEvent(false);
            var fsw = new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            };
            fsw.Changed += (s, e) => wh.Set();

            var fileName = Path.Combine(_path, _file);

            var startLine = GetFileStartLine(fileName);

            var lineNumber = 1;
            var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            using (var sr = new StreamReader(fs))
            {
                while (!cancellationToken.IsCancellationRequested && !_isCancelled)
                {
                    var s = sr.ReadLine();
                    if (s != null)
                    {
                        if (lineNumber >= startLine)
                            callbackAction(new LogTailMessage(lineNumber, s));

                        lineNumber++;
                    }
                    else
                    {
                        wh.WaitOne(1000);
                    }
                }
            }
        }

Ответы [ 2 ]

1 голос
/ 10 июля 2019

Вы много работаете над своим исходным кодом, который вам действительно не нужен, и вы создаете одноразовые и обработчики событий, которые не очищаются с помощью.

Вы действительно можете сделать все это за пару наблюдаемых.

Для начала нужно следить за изменениями в файле. вот как:

IObservable<Unit> fileSystemWatcherChanges =
    Observable
        .Using(() =>
            new FileSystemWatcher(_path)
            {
                Filter = _file,
                EnableRaisingEvents = true
            },
            fsw =>
                Observable
                    .FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                        h => fsw.Changed += h, h => fsw.Changed -= h)
                    .Select(x => Unit.Default));

Теперь вам нужно открывать поток и читать из него каждый раз, когда файл изменяется:

IObservable<LogTailMessage> messages =
    Observable
        .Using(
            () => new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite),
            fs =>
                Observable
                    .Using(
                        () => new StreamReader(fs),
                        sr =>
                            fileSystemWatcherChanges
                                .StartWith(Unit.Default)
                                .Select(x =>
                                    Observable
                                        .Defer(() => Observable.FromAsync(() => sr.ReadLineAsync()))
                                        .Repeat()
                                        .TakeUntil(w => w == null))
                                .Merge()
                                .Where(w => w != null)))
        .Select((x, n) => new LogTailMessage(n, x));

IObservable<IList<LogTailMessage>> buffered =
    messages
        .Buffer(TimeSpan.FromSeconds(5), 10);

Я проверил это на своем компьютере и считаю, что он дает нужные вам результаты.

Это полный конвейер Rx, так что если вы подпишетесь как IDisposable subscription = buffered.Subscribe();, а потом позвоните subscription.Dispose();, тогда все будет очищено после себя.

И это избегает предметов.

0 голосов
/ 09 июля 2019

Вы можете использовать Buffer по теме:

var subject = ReplaySubject<LogTailMessage>();
StartFileWatcher(a => a.ToList().ForEach(ltm => subject.OnNext(ltm)), CancellationToken.None);
bufferedSubject = subject.Buffer (TimeSpan.FromSeconds(5), 10);
...