NamedPipeServerStream.ReadAsync () не завершается, когда CancellationToken запрашивает cancell - PullRequest
0 голосов
/ 03 октября 2018

Когда поток NamedPipeServer читает какие-либо данные из канала, он не реагирует на CancellationTokenSource.Cancel()

Почему это так?

Как я могу ограничить время ожидания на сервередля данных от клиента?

Код для воспроизведения:

static void Main(string[] args)
{
    Server();
    Clinet();
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

private static async Task Server()
{
    using (var cancellationTokenSource = new CancellationTokenSource(1000))
    using (var server = new NamedPipeServerStream("test",
        PipeDirection.InOut,
        1,
        PipeTransmissionMode.Byte,
        PipeOptions.Asynchronous))
    {
        var cancellationToken = cancellationTokenSource.Token;
        await server.WaitForConnectionAsync(cancellationToken);
        await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
        var buffer = new byte[4];
        await server.ReadAsync(buffer, 0, 4, cancellationToken);
        Console.WriteLine("exit server");
    }
}

private static async Task Clinet()
{
    using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
    {
        var buffer = new byte[4];
        client.Connect();
        client.Read(buffer, 0, 4);
        await Task.Delay(5000);
        await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
        Console.WriteLine("client exit");
    }
}

Ожидаемый результат:

exit server
<client throws exception cuz server closed pipe>

Фактический результат:

client exit
exit server

РЕДАКТИРОВАТЬ

Ответ с CancelIo кажется многообещающим, и он позволяет серверу завершить связь, когда токен отмены отменен.Однако я не понимаю, почему мой «базовый сценарий» перестал работать при использовании ReadPipeAsync.

. Вот код, он включает в себя 2 клиентские функции:

  1. Clinet_ShouldWorkFine- хороший клиент, который читает / пишет во времени
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - клиент слишком медленный, сервер должен прервать связь

Ожидается:

  1. Clinet_ShouldWorkFine - выполнение завершается без каких-либо исключений
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - сервер закрывает канал, исключение выдает клиент

Факт:

  1. Clinet_ShouldWorkFine- сервер останавливается при первом вызове ReadPipeAsync, канал закрывается через 1 с, клиент создает исключение
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - сервер закрывает канал, клиент создает исключение

Почему Clinet_ShouldWorkFine не работает, когда сервер использует ReadPipeAsync

class Program
{
    static void Main(string[] args) {
        // in this case server should close the pipe cuz client is too slow
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        // in this case server should exchange data with client fine
        try {
            var tasks = new Task[3];
            tasks[0] = Server();
            tasks[1] = tasks[0].ContinueWith(c => {
                Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
            });
            tasks[2] = Clinet_ShouldWorkFine();
            Task.WhenAll(tasks).Wait();
        }
        catch (Exception ex) {
            Console.WriteLine(ex);
        }

        Console.WriteLine("press [enter] to exit");
        Console.ReadLine();
    }

    private static async Task Server()
    {
        using (var cancellationTokenSource = new CancellationTokenSource(1000))
        using (var server = new NamedPipeServerStream("test",
            PipeDirection.InOut,
            1,
            PipeTransmissionMode.Byte,
            PipeOptions.Asynchronous))
        {
            var cancellationToken = cancellationTokenSource.Token;
            await server.WaitForConnectionAsync(cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
            var buffer = new byte[4];
            var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
            Console.WriteLine("exit server");
        }
    }

    private static async Task Clinet_ShouldWorkFine()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }

    private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
    {
        using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
        {
            var buffer = new byte[4];
            client.Connect();
            client.Read(buffer, 0, 4);
            client.Read(buffer, 0, 4);
            await Task.Delay(5000);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
            Console.WriteLine("client exit");
        }
    }
}

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

Ответы [ 3 ]

0 голосов
/ 07 октября 2018

.NET-программисты испытывают ужасные проблемы с async / await, когда пишут такие маленькие тестовые программы, как эта.Плохо сочиняется, все время черепахи.В этой программе отсутствует финальная черепаха, задачи зашли в тупик.Никто не заботится о том, чтобы продолжить выполнение задач, как это обычно происходит в (скажем) приложении с графическим интерфейсом.Чрезвычайно сложно также отлаживать.

Сначала внесите незначительное изменение, чтобы тупик был полностью виден:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Это убирает неприятный маленький угловой случай, метод Server делает все этопуть к сообщению «Сервер вышел».Хроническая проблема с классом Task состоит в том, что когда задача завершается или ожидаемый метод завершается синхронно, он пытается запустить продолжение напрямую.Это происходит, чтобы работать в этой программе.Принудительное получение асинхронного результата теперь делает очевидным тупик.


Следующим шагом является исправление Main (), чтобы эти задачи больше не могли блокироваться.Это может выглядеть так:

static void Main(string[] args) {
    try {
        var tasks = new Task[3];
        tasks[0] = Server();
        tasks[1] = tasks[0].ContinueWith(c => {
            Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
        });
        tasks[2] = Clinet();
        Task.WhenAll(tasks).Wait();
    }
    catch (Exception ex) {
        Console.WriteLine(ex);
    }
    Console.WriteLine("press [enter] to exit");
    Console.ReadLine();
}

Теперь у нас есть шанс опередить и решить проблему отмены.Класс NamedPipeServerStream не реализует сам ReadAsync, он наследует метод от одного из своих базовых классов, Stream.У этого есть скверная маленькая деталь, которая полностью недокументирована.Вы можете видеть это, только когда смотрите на исходный код framework .Он может обнаружить отмену только тогда, когда отмена произошла до , когда вы вызываете ReadAsync ().Как только чтение началось, оно больше не может видеть отмену.Основная проблема, которую вы пытаетесь решить.

Это решаемая проблема, но у меня есть смутная идея, почему Microsoft не сделала этого для PipeStreams.Обычный способ принудительного завершения метода BeginRead () - это Dispose () объекта, также единственный способ, которым Stream.ReadAsync () может быть прерван.Но есть и другой способ: в Windows можно прервать операцию ввода-вывода с помощью CancelIo () .Давайте сделаем это методом расширения:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer {

    public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
        var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
        var async = pipe.BeginRead(buffer, offset, count, null, null);
        return new Task<int>(() => {
            try { return pipe.EndRead(async); }
            finally { registration.Dispose(); }
        }, cancellationToken);
    }

    private static void CancelPipeIo(PipeStream pipe) {
        // Note: no PipeStream.IsDisposed, we'll have to swallow
        try {
            CancelIo(pipe.SafePipeHandle);
        }
        catch (ObjectDisposedException) { }
    }
    [DllImport("kernel32.dll")]
    private static extern bool CancelIo(SafePipeHandle handle);

}

И, наконец, настроим сервер для его использования:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

Имейте в виду, что этот обходной путь специфичен для Windows, поэтому не может работать вПрограмма .NETCore, нацеленная на Unix-версию.Затем рассмотрите более тяжелый молоток, вызовите pipe.Close () в методе CancelPipeIo ().

0 голосов
/ 12 октября 2018

Я просто смотрю на ваш код и, может быть, на него смотрят по-новому ...

Насколько я могу судить, как в вашем оригинальном, так и в дальнейших сложных сценариях ... выпередача уже отмененного токена отмены, что довольно непредсказуемо, как другие реализуют (если таковые имеются) исключения, возникающие в методах ...

Используйте свойство IsCancellationRequested, чтобы проверить, отменен ли токен ужеи не передавайте отмененные токены.

Вот пример добавления этого в ваш код из исходного вопроса (вы можете сделать то же самое для вашего более позднего ReadPipeAsync метода.

var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);

if(!cancellationToken.IsCancellationRequested)
{
    await server.WriteAsync(new byte[] { 1, 2, 3, 4 }, 0, 4, cancellationToken);
}

if(!cancellationToken.IsCancellationRequested)
{
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
}

Console.WriteLine("exit server");

приведенный выше код приведет к

exit server
client exit

, который, я думаю, был и вашим очень оригинальным вопросом ...

0 голосов
/ 06 октября 2018

ReadAsync Сначала проверьте отмену, затем начните чтение, если токен отменен, он не действует

добавьте следующую строку

cancellationToken.Register (server.Disconnect);

using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
    PipeDirection.InOut,
    1,
    PipeTransmissionMode.Byte,
    PipeOptions.Asynchronous))
{
    var cancellationToken = cancellationTokenSource.Token;
    cancellationToken.Register(server.Disconnect);
    await server.WaitForConnectionAsync(cancellationToken);
    await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
    var buffer = new byte[4];
    await server.ReadAsync(buffer, 0, 4, cancellationToken);
    Console.WriteLine("exit server");
}
...