Проблема с обратным давлением в rx.net - PullRequest
1 голос
/ 03 июля 2019

Я хотел прочитать строки из CSV-файла и использовать RX.Net для некоторого преобразования, и я хотел сделать пакетное обновление и отправлять обновление каждые 250 миллисекунд

public static IEnumerable<string> ReadCSV(string filePath)
{
    var reader = new StreamReader(File.OpenRead(filePath));
    while (!reader.EndOfStream)
    {
        var line = reader.ReadLine();
        yield return line;
    }
}

var rows = ReadCSV("filePath").ToObservable();

rows
    .Buffer(50)
    .Zip(Observable.Interval(
        TimeSpan.FromMilliseconds(250)), (res, _) => res)
    .Subscribe(lines =>
        {
            //do something
        });

Я использую CSV-файл размером около 80 МБ, но консольный проект идет до 1 ГБ.

То, что здесь происходит, - Zip ждет, когда обе последовательности подадут ему сигнал. Последовательность csv очень быстро передает данные, поэтому хранит пакетные обновления в памяти и ожидает другую последовательность.

Что еще хуже, память не освобождается, даже когда все обновления обрабатываются. Если я удаляю Zip, память выглядит очень хорошо, похоже, что она высвобождает память во время обработки пакета (все приложение занимает всего около 20 МБ всего времени).

Два вопроса

  1. Есть ли способ сообщить наблюдаемой, что я хочу приостановить чтение, пока не будет обработано предыдущее (в моем случае это буферизованные строки).

  2. Почему память не освобождается после обработки всех обновлений, есть ли способ избежать этого?

Ответы [ 2 ]

0 голосов
/ 13 июля 2019

Мне не удалось воссоздать вашу проблему с использованием памяти.Я использовал файл 50 МБ.Однако я полагаю, что отчасти ваша проблема в том, что .ToObservable () извлекает данные из IEnumerable как можно быстрее.

Так почему бы просто не замедлить работу IEnumerable, скорость которого вы извлекаете из данных?диск, методом расширения?

(оператор .Buffer() для IEnumerable, используемый в примере, доступен в Ix.Net ).

Примерно так:

ReadCSC()
.Buffer(50)
.SlowDown(250)
.ToObservable() etc.
...

public static IEnumerable<IList<string>> SlowDown(this IEnumerable<IList<string>> source, int milliSeconds)
{
    foreach(var item in source)
    {
        yield return item;
        Thread.Sleep(milliSeconds);
    }
}

(В C # 8 можно будет сделать этот метод асинхронным и использовать Task.Delay вместо Thread.Sleep, чтобы не блокировать поток).

Этокак ваши данные читаются с диска с меньшей скоростью.Если это решит проблему с памятью, я не знаю.

0 голосов
/ 03 июля 2019

Мне удалось найти решение для вопроса 1.

rows
    .Buffer(50)
    .Select(lines =>
    {
        Thread.Sleep(250);
        return lines;
    }
    .Subscribe(lines =>
        {
            //do something
        });

Весь процесс синхронизирован, поэтому, когда я делаю Thread.Sleep наблюдаемого также прекратить чтение данных.

Хотя это может быть не очень хороший ответ.

...