Как использовать трюки asyn c для обработки смешанной задачи ввода-вывода и процессора в C#? - PullRequest
1 голос
/ 06 мая 2020

В основе проблемы лежит чтение данных из потока (привязка к вводу-выводу), обработка данных (привязка к ЦП), а затем запись в другой поток (привязка к вводу-выводу).

Наивный способ похож на это

thread 1: loop { |<--read data block from stream-->|<--process data block-->|<--write to stream-->| }

Наивный шаблон производитель-потребитель

thread 1: loop { |<--read data block from stream-->| enqueue data block to blocking queue A }

thread 2: loop { dequeue data block from blocking queue A |<--process data block-->| enqueue data block to blocking queue B }

thread 3: loop { dequeue data block from blocking queue B |<--write to stream-->| }

Пример потока выглядит следующим образом:

var hasher = MD5.Create();
using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (CryptoStream md5HashStream = new CryptoStream(readBs, hasher, CryptoStreamMode.Read))
using (FileStream writeStream= File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
    md5HashStream.CopyTo(writeBs);
}

Как использовать C# asyn c такие трюки, как asyn c stream, channel, dataflow, чтобы преобразовать приведенный выше образец потока в шаблон производитель-покупатель, чтобы сократить время блокировки io?

Ответы [ 2 ]

0 голосов
/ 07 мая 2020

Вы должны использовать Microsoft Reactive Framework (также известный как Rx) - NuGet System.Reactive и добавить using System.Reactive.Linq; - тогда вы можете сделать это:

var query =
    Observable.Using(() => new FileStream(@"filePath", FileMode.Open), readStream =>
    Observable.Using(() => new BufferedStream(readStream), readBs =>
    Observable.Using(() => MD5.Create(), hasher =>
    Observable.Using(() => new CryptoStream(readBs, hasher, CryptoStreamMode.Read), md5HashStream =>
    Observable.Using(() => File.OpenWrite(@"destPath"), writeStream => Observable.Using(() => new BufferedStream(writeStream), writeBs =>
    Observable.FromAsync(() => md5HashStream.CopyToAsync(writeBs))))))));

query.Wait(); // or await query;

Я постоянно получаю результаты в 2-5 раз быстрее, чем ваш исходный код.

0 голосов
/ 06 мая 2020

Вы можете использовать методы ReadAsync и WriteAsync потоков для ожидания операций io и чтения некоторого фиксированного blockSize количества байтов в буфер. Однако, поскольку ReadAsync может читать меньше байтов по желанию, вам необходимо сделать так, чтобы blockSize байтов считывались с помощью al oop.

int blockSize = 1024;

using (FileStream readStream = new FileStream("filePath", FileMode.Open))
using (BufferedStream readBs = new BufferedStream(readStream ))
using (FileStream writeStream = File.OpenWrite("destPath"))
using (BufferedStream writeBs = new BufferedStream(writeStream))
{
    int offset;
    var buffer = new byte[blockSize];
    do {
        offset = 0;
        while (offset < buffer.Length) 
        { // make sure to read blockSize bytes
            var bytesRead = await readBs.ReadAsync(buffer, offset, buffer.Length - offset);
            if (bytesRead == 0) break;
            offset += bytesRead;
        }

        if (offset > 0) 
        {
            var result = DoSomethingWithData(buffer, offset); // assumtion: retuns a new byte[] with only relevant data

            await writeBs.WriteAsync(result, 0, result.Length);
        }
    } while (0 < offset);
}
...