отправка следующего сообщения с ответом Rx - PullRequest
3 голосов
/ 22 сентября 2011

с учетом List<Message> я отправляю первое сообщение с моим Send(message). Теперь я хотел бы дождаться ответа (асинхронного), прежде чем я отправлю следующее сообщение ...

Блокировать до уведомления «по старому»

я знаю, как реализовать решение на основе событий для этой ситуации, используя блокировку потоков / с Monitor.Wait и Monitor.Pulse

Реактивный «новый» способ?

Но мне было интересно, имеет ли смысл использовать Reactive Extensions здесь?

Если Rx принесет здесь полезные выгоды, то как мог бы я сделать ответ реактивным путем следующего вызова send? Очевидно, что это будет включать IObservable, вероятно, два в качестве первичных источников, но что тогда?

Ответы [ 4 ]

2 голосов
/ 22 сентября 2011

Вопрос не очень конкретный и кажется очень общим в том смысле, что вы не упомянули, кто является отправителем и т. Д., Поэтому ответ также будет очень общим:)

var receiveObs = //You have created a observable around the receive mechanism 
var responses = messages.Select(m => {
   send(m);
   return receiveObs.First();
}).ToList();
2 голосов
/ 22 сентября 2011

Я думаю, что Rx - хороший выбор, но я думаю, что я мог что-то упустить в ваших требованиях.Из того, что я понимаю, Rx предлагает очень простое решение.

Если у вас уже есть список сообщений, вы можете отправить их реактивно следующим образом:

messages
    .ToObservable()
    .ObserveOn(Scheduler.ThreadPool)
    .Subscribe(m =>
    {
        Send(m);
    });

Это переводит вызовы на Send в пул потоков, и, благодаря встроенному поведению наблюдаемых, каждый вызов Send ожидает завершения предыдущего вызова.

Поскольку все это происходит в другом потоке, ваш код не является-блокирование.

Дополнительным преимуществом Rx является то, что вам не нужно менять поведение или сигнатуру вашего Send метода, чтобы эта работа работала.

Просто, да?

Я проверил это, и оно работало нормально, учитывая мое понимание вашей проблемы.Это все, что тебе нужно, или я что-то пропустил?

1 голос
/ 22 сентября 2011

Если ваш метод Send соответствует модели APM, следующий подход должен работать для вас

List<Message> messages;
IObservable<Response> xs;
xs =  messages.ToObservable().SelectMany(msg => Observable.FromAsyncPattern(Send, msg));

Изменить - это не сработает, как предложил Андерсон, вот пример, показывающий проблему

Func<int,string> Send = (ii) => { "in Send".Dump(); Thread.Sleep(2000); return ii.ToString(); };
Func<int,IObservable<string>> sendIO =  Observable.FromAsyncPattern<int,string>(Send.BeginInvoke, Send.EndInvoke);
(new [] { 1, 2, 3 }).ToObservable().SelectMany(sendIO).Dump();
1 голос
/ 22 сентября 2011

Я не уверен, что Rx здесь хорошо подходит.Rx основан на концепции «принудительных сборов», то есть передачи данных потребителям вместо их извлечения.То, что вам нужно, это вытащить первый элемент, отправить его асинхронно и перейти к следующему элементу после завершения асинхронной операции.Для такой работы идеальным инструментом будет async / await *!

async void SendMessages(List<Message> messages)
{
    foreach (Message message in messages)
    {
        await SendAsync(message);
    }
}

с

Task SendAsync(Message message);

* доступно в Async CTP или .NET 4.5 Preview

...