Каков наилучший способ ожидания сетевого пакета с использованием новой асинхронной функции C # - PullRequest
6 голосов
/ 12 сентября 2011

Я недавно поигрался с новой Async CTP, и я столкнулся с ситуацией, когда я не уверен, как действовать.

В моей текущей кодовой базе я используюпонятие «рабочие места» и «менеджер по работе».Задания существуют исключительно с целью обработки исходного сообщения, отправки ответа и его ожидания.

У меня уже есть существующий код, основанный на синхронных сокетах, где сетевой поток ожидает поступления данных, изатем передать его обработчику событий и, в конечном итоге, менеджеру работ.

Менеджер работ ищет, какое задание обработает сообщение, и передает его.

Таким образом, сценарий таков:

  1. Менеджер заданий получает новое сообщение и запускает задание.
  2. Задание запускается, обрабатывает сообщение и отправляет ответное сообщение.
  3. На этом этапезадание будет ожидать ответа на ответ.

Вот пример псевдокода:

class MyJob : Job
{
    public override void RunJob( IPacketMsg packet )
    {
        // handle packet

        var myReply = new Packet();
        SendReply( myReply );

        await GetResponse();
    }
}

Но я не совсем уверен, как действовать на шаге 3. ЗаданиеМенеджер получит ответ, а затем передаст его выполненной работе.Но я не уверен, как заставить работу ждать ответа.

Я подумал о создании ожидаемой Задачи, которая просто блокирует WaitHandle, но это лучшее решение?

Могу ли я сделать что-нибудь еще в этом случае?

Редактировать Что касается Async CTP, что происходит в ситуации, когда пользовательский интерфейс не используется.Я перечитал блог Эрика Липперта об Async, но я не думаю, что он когда-либо затрагивал тему того, как все работает в фоновом режиме без потока пользовательского интерфейса (это работает в фоновом режиме или ...?)

Ответы [ 3 ]

5 голосов
/ 13 сентября 2011
  1. Менеджер заданий получает новое сообщение и запускает задание.
  2. Задание запускается, обрабатывает сообщение и отправляет ответное сообщение.
  3. На этом этапезадание будет ожидать ответа на ответ.

Прежде всего, я должен отметить, что Async CTP очень хорошо обрабатывает асинхронные операции , но асинхронные события не так много.Возможно, вы захотите рассмотреть подход на основе Rx.Но давайте сейчас перейдем к Async CTP.

У вас есть два основных варианта создания задач:

  • С делегатом.например, Task.Factory.StartNew запустит делегат в пуле потоков.Пользовательские фабрики задач и планировщики предоставляют больше возможностей для делегатов задач (например, указание, что делегат должен выполняться в потоке STA).
  • Без делегата.например, TaskFactory.FromAsync оборачивает существующую пару методов Begin / End, TaskEx.FromResult возвращает «будущую константу», а TaskCompletionSource может использоваться для явного управления Task (как FromAsync, так и * 1030).* используйте TCS для внутреннего использования.

Если обработка задания связана с ЦП, имеет смысл передать его на Task.Factory.StartNew.Я предполагаю, что обработка задания связана с процессором.

Псевдокод менеджера заданий:

// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
  IJob job = ..;
  Task.Factory.StartNew(job.RunJob(message));
}

// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;

// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
  var tcs = new TaskCompletionSource<IResponse>();
  responseTasks.Add(replyId, tcs);
  return tcs.Task;
}

// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
  var tcs = responseTasks[response.ReplyId];
  responseTasks.Remove(response.ReplyId);
  tcs.TrySetComplete(response);
}

Идея состоит в том, что менеджер заданий также управляет списком выдающихся ответов.Чтобы это произошло, я ввел простой int идентификатор ответа, который менеджер заданий может использовать для определения того, какой ответ соответствует какому ответу.

Теперь задания могут работать следующим образом:

public override void RunJob(IPacketMsg packet)
{
  // handle packet
  var myReply = new Packet();
  var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
  SendReply(myReply);

  await response;
}

Есть несколько хитрых вещей, поскольку мы помещаем задания в поток пула потоков:

  1. GetResponseForReplyAsync должен быть вызван (регистрация задачи) перед ответомотправляется, а затем await ed позже.Это сделано для того, чтобы избежать ситуации, когда ответ может быть отправлен и ответ получен до того, как мы сможем его зарегистрировать.
  2. RespondToResponse удалит регистрацию задачи до ее завершения, на всякий случай, если задача будет завершена.вызывает отправку другого ответа с тем же идентификатором.

Если задания достаточно короткие, чтобы их не нужно было помещать в поток пула потоков, решение может быть упрощено.

3 голосов
/ 13 сентября 2011

Что касается Async CTP, что происходит в ситуации, когда пользовательский интерфейс не используется. Я перечитал блог Эрика Липперта об Async, но я не думаю, что он когда-либо затрагивал тему о том, как все работает в фоновом режиме без потока пользовательского интерфейса (это работает в фоновом режиме или ...?)

await вернется в контекст выполнения. В процессе пользовательского интерфейса это цикл сообщений пользовательского интерфейса. В ASP.NET это пул потоков ASP.NET. В других ситуациях (консольные приложения и службы Win32) контекст отсутствует, поэтому продолжения помещаются в очередь на ThreadPool. Обычно это нежелательное поведение, поэтому я написал класс AsyncContext, который можно использовать в таких ситуациях.

BackgroundWorker не используется. В сценарии на стороне сервера, таком как ваш, весьма обычно вообще не иметь фонового потока.

1 голос
/ 13 сентября 2011

Вы просто подключили бы остальную часть вашего обработчика событий к шаблону ожидания следующим образом:

 public async void RunJob(IPacketMsg msg)
 {
     // Do Stuff

     var response = await GetResponse();

     // response is "string", not "Task<string>"

     // Do More Stuff
 }

 public Task<string> GetResponse()
 {
     return Task.Factory.StartNew(() =>
        {
             _networkThingy.WaitForDataAvailable();

             return _networkThingy.ResponseString;
        });
 }

Когда ваша задача get response завершится, остальная часть метода возобновит выполнение в текущем контексте синхронизации,Однако до тех пор выполнение вашего метода не будет выполнено (поэтому любой код после ожидания не будет запущен до завершения задачи, запущенной в GetResponse)

...