.NET Асинхронный поток чтения / записи - PullRequest
46 голосов
/ 09 октября 2009

Я пытался решить это упражнение "Параллельное программирование" (на C #):

Зная, что класс Stream содержит методы int Read(byte[] buffer, int offset, int size) и void Write(byte[] buffer, int offset, int size), реализуйте в C # метод NetToFile, который копирует все данные, полученные из экземпляра NetworkStream net, в экземпляр FileStream file. Чтобы выполнить передачу, используйте асинхронное чтение и синхронную запись, избегая блокировки одного потока во время операций чтения. Передача заканчивается, когда операция чтения net возвращает значение 0. Для упрощения нет необходимости поддерживать контролируемую отмену операции.

void NetToFile(NetworkStream net, FileStream file);

Я пытался решить это упражнение, но я борюсь с вопросом, связанным с самим вопросом. Но сначала вот мой код:

public static void NetToFile(NetworkStream net, FileStream file) {
    byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
    int offset = 0; // read/write offset
    int nBytesRead = 0; // number of bytes read on each cycle

    IAsyncResult ar;
    do {
        // read partial content of net (asynchronously)
        ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
        // wait until read is completed
        ar.AsyncWaitHandle.WaitOne();
        // get number of bytes read on each cycle
        nBytesRead = net.EndRead(ar);

        // write partial content to file (synchronously)
        fs.Write(buffer,offset,nBytesRead);
        // update offset
        offset += nBytesRead;
    }
    while( nBytesRead > 0);
}

У меня вопрос в том, что в постановке вопроса сказано:

Для выполнения передачи используйте асинхронный читает и синхронно пишет, избегая один поток будет заблокирован во время чтения операции

Я не совсем уверен, что мое решение выполняет то, что нужно в этом упражнении, потому что я использую AsyncWaitHandle.WaitOne(), чтобы дождаться завершения асинхронного чтения.

С другой стороны, я не совсем понимаю, что в этом сценарии должно быть «неблокирующим» решением, поскольку запись FileStream должна выполняться синхронно ... и делать это Мне нужно подождать, пока NetworkStream чтение завершится, чтобы продолжить запись FileStream, не так ли?

Не могли бы вы помочь мне с этим?


[РЕДАКТИРОВАТЬ 1] Использование Обратный вызов Решение

Хорошо, если я понял, что Митчел Селлерс и willvv ответили, мне посоветовали использовать метод обратного вызова, чтобы превратить это в "неблокирующее" решение. Вот мой код:

byte[] buffer; // buffer

public static void NetToFile(NetworkStream net, FileStream file) {
    // buffer with same dimension as file stream data
    buffer = new byte[file.Length];
    //start asynchronous read
    net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}

//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
    //NetworkStream retrieve
    NetworkStream net = (NetworkStream) ar.IAsyncState;
    //get number of bytes read
    int nBytesRead = net.EndRead(ar);

    //write content to file
    //... and now, how do I write to FileStream instance without
    //having its reference??
    //fs.Write(buffer,0,nBytesRead);
}

Как вы могли заметить, я застрял в методе обратного вызова, так как у меня нет ссылки на экземпляр FileStream, где я хочу вызвать метод "Write (...)".

Кроме того, это не поточно-ориентированное решение, так как поле byte[] открыто и может использоваться несколькими одновременными вызовами NetToFile. Я не знаю, как решить эту проблему, не выставляя это поле byte[] во внешней области видимости ... и я почти уверен, что оно не может быть открыто таким образом.

Я не хочу использовать лямбда-решение или решение с анонимным методом, потому что это не входит в программу курса "Параллельное программирование".

Ответы [ 6 ]

52 голосов
/ 10 ноября 2010

Даже если это идет против зерна, чтобы помочь людям с домашней работой, учитывая, что этому больше года, вот правильный способ сделать это. Все что вам нужно перекрывать ваши операции чтения / записи & mdash; не нужно создавать дополнительные темы или что-то еще.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }


    public static async Task CopyToAsync( this Stream input , Stream output )
    {
        await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
        return;
    }

    public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 } ;
        int          bufno = 0 ;
        Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
        Task         write = null ;

        while ( true )
        {

            await read ;
            bufl[bufno] = read.Result ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                await write ;
            }

            // start the new write operation
            write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            await write;
        }

        output.Flush();

        // return to the caller ;
        return;
    }

}

Приветствие.

16 голосов
/ 31 октября 2013

Я сомневаюсь, что это самый быстрый код (есть некоторые издержки из абстракции .NET Task), но я думаю, что это чище подход ко всему асинхронному копированию.

Мне нужно было CopyTransformAsync, где я мог бы передать делегату что-то сделать, так как чанки проходили через операцию копирования. например вычислить дайджест сообщения при копировании. Вот почему я заинтересовался катанием своего собственного варианта.

Выводы:

  • CopyToAsync bufferSize является чувствительным (требуется большой буфер)
  • FileOptions.Asynchronous -> делает его ужасно медленным (не знаю точно, почему это так)
  • bufferSize объектов FileStream может быть меньше (это не так важно)
  • Тест Serial явно самый быстрый и наиболее ресурсоемкий

Вот что я нашел и полный исходный код для программы, которую я использовал для проверки этого. На моей машине эти тесты выполнялись на диске SSD и являются эквивалентом копии файла. Как правило, вы не хотите использовать это для простого копирования файлов, вместо этого, когда у вас есть сетевой поток (таков мой вариант использования), именно тогда вы захотите использовать что-то подобное.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s

Здесь вы можете увидеть Process Explorer, график производительности при запуске теста. По сути, каждый top (на нижнем из трех графиков) является началом последовательного теста. Вы можете ясно видеть, как пропускная способность резко увеличивается с увеличением размера буфера. Это выглядело бы так, как если бы оно планировалось где-то около 80 КБ, что и используется внутри .NET framework CopyToAsync.

Performance Graph

Приятно то, что окончательная реализация не была такой сложной:

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}

Этот метод чередования чтения / записи, несмотря на огромные буферы, где-то на 18% быстрее, чем BCL CopyToAsync.

Из любопытства я изменил асинхронные вызовы на типичные асинхронные вызовы начала / конца, и это не улучшило ситуацию ни на один бит, а только ухудшило ситуацию. Для всех, кого я люблю использовать для абстракции Task, они делают некоторые изящные вещи, когда вы пишете код с ключевыми словами async / await, и читать этот код гораздо приятнее!

12 голосов
/ 09 октября 2009

Вам понадобится использовать обратный вызов из чтения NetStream для обработки этого. И, честно говоря, может быть проще обернуть логику копирования в свой собственный класс, чтобы вы могли поддерживать экземпляр активных потоков.

Вот как я бы подошел (не проверено):

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}
11 голосов
/ 30 декабря 2010

Ого, это все очень сложно! Вот мое асинхронное решение, и это всего лишь одна функция. Read () и BeginWrite () работают одновременно.

/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
///  and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
    // This stream copy supports a source-read happening at the same time
    // as target-write.  A simpler implementation would be to use just
    // Write() instead of BeginWrite(), at the cost of speed.

    byte[] readbuffer = new byte[4096];
    byte[] writebuffer = new byte[4096];
    IAsyncResult asyncResult = null;

    for (; ; )
    {
        // Read data into the readbuffer.  The previous call to BeginWrite, if any,
        //  is executing in the background..
        int read = source.Read(readbuffer, 0, readbuffer.Length);

        // Ok, we have read some data and we're ready to write it, so wait here
        //  to make sure that the previous write is done before we write again.
        if (asyncResult != null)
        {
            // This should work down to ~0.01kb/sec
            asyncResult.AsyncWaitHandle.WaitOne(60000);
            target.EndWrite(asyncResult); // Last step to the 'write'.
            if (!asyncResult.IsCompleted) // Make sure the write really completed.
                throw new IOException("Stream write failed.");
        }

        if (read <= 0)
            return; // source stream says we're done - nothing else to read.

        // Swap the read and write buffers so we can write what we read, and we can
        //  use the then use the other buffer for our next read.
        byte[] tbuf = writebuffer;
        writebuffer = readbuffer;
        readbuffer = tbuf;

        // Asynchronously write the data, asyncResult.AsyncWaitHandle will
        // be set when done.
        asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
    }
}
9 голосов
/ 11 июля 2011

Странно, что никто не упомянул TPL.
Вот очень приятный пост от команды PFX (Стивен Тауб) о том, как реализовать одновременную асинхронную потоковую копию. Пост содержит устаревшие ссылки на образцы, поэтому следующий:
Получить Параллельные расширения Extras из code.msdn затем

var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();

Также рассмотрите возможность использования AsyncEnumerator Дж. Ричера .

0 голосов
/ 09 октября 2009

Вы правы, то, что вы делаете, это в основном синхронное чтение, потому что вы используете метод WaitOne (), и он просто останавливает выполнение до тех пор, пока данные не будут готовы, это практически то же самое, что делать это с помощью Read () вместо этого. BeginRead () и EndRead ().

Что вам нужно сделать, это использовать аргумент обратного вызова в методе BeginRead (), с его помощью вы определяете метод обратного вызова (или лямбда-выражение), этот метод будет вызываться, когда информация будет прочитана (в метод обратного вызова, вы должны проверить конец потока и записать его в выходной поток), таким образом, вы не будете блокировать основной поток (вам не понадобятся WaitOne () или EndRead ().

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...