Составление командной шины с использованием RX - PullRequest
1 голос
/ 12 февраля 2012

Я знакомлюсь с RX и в качестве своего экспериментального проекта я пытаюсь создать простую командную шину, концептуально подобную этой:

class Bus
{
    Subject<Command> commands;
    IObservable<Invocation> invocations;

    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = commands.Select(x => new Invocation { Command = x }).Publish();
    }

    public IObserver<Command> Commands
    {
        get { return this.commands; }
    }

    public IObservable<Invocation> Invocations
    {
        get { return this.invocations; }
    }
}

class Invocation
{
    public Command Command { get; set; }
    public bool Handled { get; set; }
}

Идея состоит в том, что модули могут устанавливать обработчики команд при запускеиспользуя свойство Invocations и может применить любую фильтрацию, какую пожелает, к своей подписке.С другой стороны, клиенты могут инициировать выполнение команды, вызывая Commands.OnNext(command).

. Однако я хотел бы, чтобы шина гарантировала, что каждая отправленная команда будет обрабатываться ровно одним обработчиком.То есть обработка OnNext в идеале должна прекратиться, как только первый обработчик установит Invocation.Handled на true и должен выдать исключение, если при выводе OnNext(), Invocation.Handled все еще равно false.

Я играл над созданием своих собственных реализаций ISubject, IObservable и IObserver, но это кажется «грязным и дешевым»;)

Я стараюсь осмыслить композиционную мощь, которую обеспечивает RX.Композиционно, как я могу предоставить гарантию «ровно один раз»?

Спасибо за любые идеи, которые вы можете предоставить.

Ответы [ 2 ]

5 голосов
/ 13 февраля 2012

У вас вообще есть правильная идея, на самом деле.Вам просто нужно сделать фактическую отправку.В этом поможет SelectMany:

class Bus
{
    Subject<Command> commands;
    Subject<Invocation> invocations;

    // TODO: Instantiate me
    List<Func<Command, bool>> handlerList; 

    public Bus()
    {
        this.commands = new Subject<Command>();
        this.invocations = new Subject<Invocation>();

        commands.SelectMany(x => {
            // This FirstOrDefault() is just good ol' LINQ
            var passedHandler = 
                handlerList.FirstOrDefault(handler => handler(x) == true);

            return passedHandler != null ?
                Observable.Return(new Invocation() { Command = x, Handled = true}) :
                Observable.Throw<Invocation>(new Exception("Unhandled!"));
        }).Multicast(invocations).Connect();
    }

    /* ... snip ... */
}

Но, если честно, это не совсем демонстрирует мощь Rx, потому что он выполняет список обработчиков синхронно.Давайте сделаем это более убедительным, сделав его полностью неблокирующим.

Во-первых, мы изменим наш прототип Func на Func<Command, IObservable<Invocation>>.Это означает, что метод принимает команду и выдает результат Future Invocation (a-la Task<T>).Затем мы можем получить идентичное поведение, но при этом наши селекторы будут асинхронными с помощью этого селектора (кодирование через TextArea впереди):

commands.SelectMany(x =>
    handlerList.ToObservable()
        .Select(h => Observable.Defer(() => h(x)))
        .Concat()
        .SkipWhile(x => x.Handled == false)
        .TakeLast(1))
    .Multicast(invocations).Connect();

Это довольно хорошее использование Rx на уровне выпускника, но идея заключается в том, что для каждогоКоманда, мы собираемся сначала создать поток обработчиков и запустить их по порядку (это то, что делает Defer + Concat), пока мы не найдем тот, у которого Handled - true, а затем возьмем последний.

Внешний SelectMany выбирает поток команд в поток будущих результатов (т. Е. Тип IO<IO<Invocation>>, а затем сглаживает его, так что он становится потоком результатов.

Нет блокировки никогда,очень лаконичный, на 100% тестируемый, типобезопасный код, который просто выражал довольно сложную идею, которую было бы очень уродливо писать настоятельно. Вот почему Rx - это круто.

0 голосов
/ 12 февраля 2012

Хотя, возможно, вы можете создать «точно один раз» субъект, вы не должны . Интерфейс (и все операторы в библиотеке) подразумевают, что все наблюдатели будут уведомлены (игнорируя возможность исключений в вызовах OnNext).

Что вы можете сделать, это создать альтернативный набор интерфейсов, которые определяют желаемую семантику:

interface IHandlableObservable<T>
{
    //gets first chance at the notification
    IDisposable SubscribeFirst(IHandlingObserver<T> observer);
    //gets last chance at the notification
    IDisposable SubscribeLast(IHandlingObserver<T> observer);
    //starts the notification (possibly subscribing to an underlying IObservable)
    IDisposable Connect();
}

interface IHandlingObserver<T>
{
    //return indicates if the observer "handled" the value
    bool OnNext(T value);
    void OnError(Exception ex);
    void OnCompleted();
}

Затем вы можете определить методы, которые позволят вам преобразовывать обычные наблюдаемые в доступные для обработки наблюдаемые, чтобы вы могли сохранить большую часть логики в стандартных RX-операторах.

...