В настоящее время я использую RX Framework для реализации конвейера обработки сообщений, похожего на рабочий процесс. По сути, у меня есть производитель сообщений (десериализует сетевые сообщения и вызывает OnNext () для субъекта), и у меня есть несколько потребителей.
ПРИМЕЧАНИЕ. Если и преобразование - это методы расширения, которые я кодировал, которые просто возвращают IObservable.
Потребитель делает что-то вроде следующего:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
затем используется другим аналогичным конвейером, и это продолжается до вершины, где он заканчивается тем, что кто-то вызывает Subscribe()
в последнем конвейере. Проблема, с которой я сталкиваюсь, заключается в том, что сообщения из базы не распространяются вверх, если только подписка не вызывается непосредственно для сообщений.
Как я могу отправить сообщения на вершину стека? Я знаю, что это неортодоксальный подход, но я чувствую, что он делает код очень простым для понимания того, что происходит с сообщением. Кто-нибудь может предложить другой способ сделать то же самое, если вы чувствуете, что это совершенно ужасная идея?