Как условно буферизовать в RX - PullRequest
0 голосов
/ 10 июля 2019

Я пытаюсь загрузить файл журнала в виде потока, чтобы я мог получить работающий в реальном времени журнал, работающий через Интернет.

Он должен загрузить исторические строки на дату из файла журнала, а затем запускать обновления для каждой новой строки, которая будет сохранена.

Чтобы уменьшить трафик signalR, я собираю их по 100 строк с использованием буфера RX, однако это проблема при загрузке исходного содержимого файла - которое может быть 100k строк. Загрузка этого по 100 штук слишком медленная. Исходное содержимое файла должно быть отправлено в виде одного сообщения.

Что я действительно хочу, так это отправить один тик в Observable сначала со всем содержимым файла до настоящего времени, а затем с этого момента, с помощью буферизованных обновлений для записи новой строки. Но я не уверен, как получить исходный контент в виде одного тика, а затем буферизировать с этого момента.

Мой код пока что

var watcherSubject = new ReplaySubject<LogTailMessage>()
var watcher = new logFileWatcher(logFileLocation)
new TaskFactory().StartNew(() => watcher.StartFileWatch(data => watcherSubject.OnNext(data), CancellationToken.None));

Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .Replay()
    .RefCount();

Обновленное решение

var initialFileLines = watcher.GetInitialData();

new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));

Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
    .StartWith(initialFileLines)
    .Replay()
    .RefCount();

1 Ответ

0 голосов
/ 10 июля 2019

Использование StartWith:

var originalFileLines = new List<LogTailMessage>(); //Initialize with file contents.

Stream = watcherSubject
    .Buffer(TimeSpan.FromMilliseconds(500), 100)
    .Where(d => d != null)
    .StartWith(originalFileLines)
    .Replay()
    .RefCount();

Обновление : я не уверен, почему StartWith не работает надежно.Можете ли вы изменить ответ с помощью смоделированного примера?

.Concat должен работать, хотя я думаю, что это в основном то, что StartWith должен делать .:

Stream = Observable.Return(originalFileLines).Concat(
    watcherSubject
        .Buffer(TimeSpan.FromMilliseconds(500), 100)
        .Where(d => d != null)
        .Replay()
        .RefCount()
   );
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...