Использование Reactive extension (Rx) для приема сообщений MSMQ с использованием асинхронного шаблона (queue.BeginReceive, queue.EndReceive) - PullRequest
10 голосов
/ 04 ноября 2011

Я уже некоторое время использую Rx для событий в своих проектах и ​​специально для программирования на Socket, и хорошая часть - это то, что у него все хорошо. Управление моим кодом, преимущество в производительности и намного лучшее выполнение и интерпретация.

В последнее время я должен изменить поток процессов моего проекта, где мне нужно выгрузить все входящие данные (из операций с сокетами) в очереди ( с использованием реализации MSMQ, как было решено для очереди ).

Поскольку MSMQ обеспечивает асинхронный вызов для удаления сообщений из очереди (но в странной схеме). Я изо всех сил пытался использовать Rx для этой цели, но включил это.

Вопрос: Может ли кто-нибудь дать мне чистый пример кода для реализации Rx для получения сообщений из очереди с использованием шаблона Async.

Мне нужна реализация асинхронного оператора для MSMQ, похожая на что-то вроде этого

var data = Observable.FromAsyncPattern<byte[]>(
                        this.receiverSocket.BeginReceive,
                        this.receiverSocket.EndReceive(some parameters);

Спасибо заранее. * ура * в Rx и .NET

1 Ответ

4 голосов
/ 05 ноября 2011

Это будет так же просто, как:

var queue = new System.Messaging.MessageQueue("test");
var fun = Observable.FromAsyncPattern((cb, obj) => queue.BeginReceive(TimeSpan.FromMinutes(10),obj,cb), a => queue.EndReceive(a));
var obs = fun();
...