Состояние гонки с RX с использованием Buffer и StartWith - PullRequest
0 голосов
/ 10 июля 2019

Я пытаюсь изначально загрузить содержимое файла журнала и поставить галочку на наблюдаемой переменной LogTailData[] - поэтому, если начальное содержимое файла составляет 20 тыс. Строк, первый тик должен быть массивом длиной 20 тыс.

.

После этого я хочу буферизовать обновления файла с помощью средства просмотра файлов и отмечать их при получении новой строки, буферизуя их с помощью RX.

Я использую StartWith() для загрузки исходногосодержимое файла, тогда Buffered Observable должен кормить обновления.

Однако проблема, с которой я сталкиваюсь, заключается в том, что если исходные данные большие (например, 20 тыс. Строк), то обновление файла, кажется, сначала ставится галочкой на наблюдаемой, а не перед большим массивом, предоставленным StartWith.

Мне интересно, если способ, которым работает Buffer(), означает, что если таймер истечет до того, как initialFileLines можно будет поставить на наблюдаемое, вы получите их не по порядку?

Iнужно гарантировать, что исходные данные будут отмечены первыми, независимо от размера набора данных.

Мой код

var initialFileLines = watcher.GetInitialData();

new TaskFactory().StartNew(() => watcher.StartFileWatcher(data => watcherSubject.OnNext(data), _cts.Token));

Stream = watcherSubject.Buffer(TimeSpan.FromMilliseconds(500), 100)
    .StartWith(initialFileLines)
    .Replay()
    .RefCount();
...