RX IO, наблюдаемый как конвейер - PullRequest
4 голосов
/ 11 февраля 2010

В настоящее время я использую RX Framework для реализации конвейера обработки сообщений, похожего на рабочий процесс. По сути, у меня есть производитель сообщений (десериализует сетевые сообщения и вызывает OnNext () для субъекта), и у меня есть несколько потребителей.

ПРИМЕЧАНИЕ. Если и преобразование - это методы расширения, которые я кодировал, которые просто возвращают IObservable.

Потребитель делает что-то вроде следующего:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest затем используется другим аналогичным конвейером, и это продолжается до вершины, где он заканчивается тем, что кто-то вызывает Subscribe() в последнем конвейере. Проблема, с которой я сталкиваюсь, заключается в том, что сообщения из базы не распространяются вверх, если только подписка не вызывается непосредственно для сообщений.

Как я могу отправить сообщения на вершину стека? Я знаю, что это неортодоксальный подход, но я чувствую, что он делает код очень простым для понимания того, что происходит с сообщением. Кто-нибудь может предложить другой способ сделать то же самое, если вы чувствуете, что это совершенно ужасная идея?

1 Ответ

1 голос
/ 15 февраля 2010

Зачем им идти по конвейеру, если нет подписчиков? Если один из ваших промежуточных шагов полезен для их побочных эффектов (вы хотите, чтобы они запускались, даже если других подписчиков нет), вам следует переписать операцию побочного эффекта, чтобы она была подписчиком.

Вы также можете сделать шаг с побочным эффектом в качестве сквозной операции (или ти, если хотите), если хотите продолжить цепочку.

...