Обработка пакета запроса с помощью Reactive Extension - PullRequest
1 голос
/ 31 октября 2011

Я изучаю Reactive Extensions и пытаюсь выяснить, подходит ли это для такой задачи, как эта.

У меня есть метод Process (), который обрабатывает пакет запросов как единицу работы и вызывает обратный вызов, когда все запросы выполнены.

Здесь важно то, что каждый запрос будет вызывать обратный вызов либо синхронным, либо асинхронным, в зависимости от его реализации, и пакетный процессор должен уметь обрабатывать оба.

Но потоки не запускаются изпакетный процессор, любые новые потоки (или другое асинхронное выполнение) будут инициированы из обработчиков запросов, если это необходимо.Я не знаю, соответствуют ли они сценариям использования rx.

Мой текущий рабочий код выглядит (почти) так:

public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
{
    IUnitOfWork uow = null;
    try
    {
        uow = unitOfWorkFactory.Create();

        var responses = new List<IResponse>();
        var outstandingRequests = requests.Count;
        foreach (var request in requests)
        {
            var correlationId = request.CorrelationId;
            Action<IResponse> requestCallback = response =>
            {
                response.CorrelationId = correlationId;
                responses.Add(response);
                outstandingRequests--;
                if (outstandingRequests != 0)
                    return;

                uow.Commit();
                onCompleted(responses);
            };

            requestProcessor.Process(request, requestCallback);
        }
    }
    catch(Exception)
    {
        if (uow != null) 
            uow.Rollback();
    }

    if (uow != null) 
        uow.Commit();
}        

Как бы вы реализовали это с помощью rx?Разумно ли это?

Обратите внимание, что единица работы должна выполняться синхронно, даже если есть асинхронные запросы, которые еще не вернулись.

Ответы [ 2 ]

3 голосов
/ 31 октября 2011

Мой подход к этому двухступенчатый.

Сначала создайте оператор общего назначения, который превращает Action<T, Action<R>> в Func<T, IObservable<R>>:

public static class ObservableEx
{
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
        this Action<T, Action<R>> call)
    {
        if (call == null) throw new ArgumentNullException("call");
        return t =>
        {
            var subject = new AsyncSubject<R>();
            try
            {
                Action<R> callback = r =>
                {
                    subject.OnNext(r);
                    subject.OnCompleted();
                };
                call(t, callback);
            }
            catch (Exception ex)
            {
                return Observable.Throw<R>(ex, Scheduler.ThreadPool);
            }
            return subject.AsObservable<R>();
        };
    }
}

Затем включите вызов void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted) в IObservable<IResponse> Process(IObservable<IRequest> requests):

public IObservable<IResponse> Process(IObservable<IRequest> requests)
{
    Func<IRequest, IObservable<IResponse>> rq2rp =
        ObservableEx.FromAsyncCallbackPattern
            <IRequest, IResponse>(requestProcessor.Process);

    var query = (
        from rq in requests
        select rq2rp(rq)).Concat();

    var uow = unitOfWorkFactory.Create();
    var subject = new ReplaySubject<IResponse>();

    query.Subscribe(
        r => subject.OnNext(r),
        ex =>
        {
            uow.Rollback();
            subject.OnError(ex);
        },
        () =>
        {
            uow.Commit();
            subject.OnCompleted();
        });

    return subject.AsObservable();
}

Теперь не только выполняется асинхронная обработка, но и обеспечивается правильный порядок результатов.

На самом деле, поскольку вы начинаете с коллекции, вы можете даже сделать это:

var rqs = requests.ToObservable();

var rqrps = rqs.Zip(Process(rqs),
    (rq, rp) => new
    {
        Request = rq,
        Response = rp,
    });

Тогда у вас будет наблюдаемое, которое объединяет в пары каждый запрос / ответ без необходимости в CorrelationId свойстве.

Надеюсь, это поможет.

1 голос
/ 31 октября 2011

Это часть гениальности Rx, поскольку вы можете возвращать результаты синхронно или асинхронно:

public IObservable<int> AddNumbers(int a, int b) {
    return Observable.Return(a + b);  
}

public IObservable<int> AddNumbersAsync(int a, int b) {
    return Observable.Start(() => a + b, Scheduler.NewThread);
}

Они оба имеют тип IObservable, поэтому они работают одинаково.Если вы хотите выяснить, когда все IObservables завершены, Aggregate сделает это, поскольку он превратит 'n' элементов в Observable в 1 элемент, который возвращается в конце:

IObservable<int> listOfObservables[];

listObservables.ToObservable()
    .Merge()
    .Aggregate(0, (acc, x) => acc+1)
    .Subscribe(x => Console.WriteLine("{0} items were run", x));
...