Когда поток NamedPipeServer читает какие-либо данные из канала, он не реагирует на CancellationTokenSource.Cancel()
Почему это так?
Как я могу ограничить время ожидания на сервередля данных от клиента?
Код для воспроизведения:
static void Main(string[] args)
{
Server();
Clinet();
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}
private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}
private static async Task Clinet()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
Ожидаемый результат:
exit server
<client throws exception cuz server closed pipe>
Фактический результат:
client exit
exit server
РЕДАКТИРОВАТЬ
Ответ с CancelIo
кажется многообещающим, и он позволяет серверу завершить связь, когда токен отмены отменен.Однако я не понимаю, почему мой «базовый сценарий» перестал работать при использовании ReadPipeAsync
.
. Вот код, он включает в себя 2 клиентские функции:
Clinet_ShouldWorkFine
- хороший клиент, который читает / пишет во времени Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- клиент слишком медленный, сервер должен прервать связь
Ожидается:
Clinet_ShouldWorkFine
- выполнение завершается без каких-либо исключений Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- сервер закрывает канал, исключение выдает клиент
Факт:
Clinet_ShouldWorkFine
- сервер останавливается при первом вызове ReadPipeAsync
, канал закрывается через 1 с, клиент создает исключение Clinet_ServerShouldEndCommunication_CuzClientIsSlow
- сервер закрывает канал, клиент создает исключение
Почему Clinet_ShouldWorkFine
не работает, когда сервер использует ReadPipeAsync
class Program
{
static void Main(string[] args) {
// in this case server should close the pipe cuz client is too slow
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
// in this case server should exchange data with client fine
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ShouldWorkFine();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}
private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}
private static async Task Clinet_ShouldWorkFine()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
}
public static class AsyncPipeFixer {
public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() => {
try { return pipe.EndRead(async); }
finally { registration.Dispose(); }
}, cancellationToken);
}
private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIo(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);
}