Я хочу построчно загружать содержимое файла в поток, а затем сидеть и наблюдать каждую секунду за новыми записями в файле - так что программа для чтения файлов в реальном времени передает данные в RX.
Я добился этого, читая по одной строке за раз и, если в readline () есть данные, для обратного вызова через переданное действие, которое в вызывающей стороне помещает данные в ReplaySubject для подписчиков RX.
Проблема состоит в том, что он отправляет только одну строку за раз в RX-потоке.Я хотел бы сгруппировать их, чтобы они не перезванивали, пока вы либо не скажете 10 пунктов для отправки, либо не пройдет определенное время - например, 5-10 секунд.
Мой обратный вызов - это набор данных, сейчас я жестко запрограммировал его, чтобы он возвращал только один элемент в коллекции, так как не могу понять, как выполнить пакетную обработку по времени.
Кто-нибудь может подсказать, как этого добиться?
Мой код пока
public void StartFileWatcher(Action<LogTailMessage[]> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
//todo - batch these up so we only call back once we have 10 items, or if a certain amount of time has passed, send what we have
callbackAction(new [] {new LogTailMessage(lineNumber, s)});
lineNumber++;
}
else
wh.WaitOne(1000);
}
}
}
Обновлено: Буферное решение
var watcherSubject = new ReplaySubject<LogTailMessage>();
var watcher = new LogFileWatcher(path, filename);
new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));
Stream = watcherSubject
.Buffer(TimeSpan.FromMilliseconds(500), 20)
.Where(d => d != null)
.Replay()
.RefCount();
И средство просмотра файлов
public void StartFileWatcher(Action<LogTailMessage> callbackAction, CancellationToken cancellationToken)
{
var wh = new AutoResetEvent(false);
var fsw = new FileSystemWatcher(_path)
{
Filter = _file,
EnableRaisingEvents = true
};
fsw.Changed += (s, e) => wh.Set();
var fileName = Path.Combine(_path, _file);
var startLine = GetFileStartLine(fileName);
var lineNumber = 1;
var fs = new FileStream(Path.Combine(_path, _file), FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
using (var sr = new StreamReader(fs))
{
while (!cancellationToken.IsCancellationRequested && !_isCancelled)
{
var s = sr.ReadLine();
if (s != null)
{
if (lineNumber >= startLine)
callbackAction(new LogTailMessage(lineNumber, s));
lineNumber++;
}
else
{
wh.WaitOne(1000);
}
}
}
}