Я хотел прочитать строки из CSV-файла и использовать RX.Net для некоторого преобразования, и я хотел сделать пакетное обновление и отправлять обновление каждые 250 миллисекунд
public static IEnumerable<string> ReadCSV(string filePath)
{
var reader = new StreamReader(File.OpenRead(filePath));
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
yield return line;
}
}
var rows = ReadCSV("filePath").ToObservable();
rows
.Buffer(50)
.Zip(Observable.Interval(
TimeSpan.FromMilliseconds(250)), (res, _) => res)
.Subscribe(lines =>
{
//do something
});
Я использую CSV-файл размером около 80 МБ, но консольный проект идет до 1 ГБ.
То, что здесь происходит, - Zip ждет, когда обе последовательности подадут ему сигнал.
Последовательность csv очень быстро передает данные, поэтому хранит пакетные обновления в памяти и ожидает другую последовательность.
Что еще хуже, память не освобождается, даже когда все обновления обрабатываются. Если я удаляю Zip, память выглядит очень хорошо, похоже, что она высвобождает память во время обработки пакета (все приложение занимает всего около 20 МБ всего времени).
Два вопроса
Есть ли способ сообщить наблюдаемой, что я хочу приостановить чтение, пока не будет обработано предыдущее (в моем случае это буферизованные строки).
Почему память не освобождается после обработки всех обновлений, есть ли способ избежать этого?