Использование Rx Framework для асинхронных вызовов с использованием шаблона void AsyncMethod (Action <T>callback) - PullRequest
2 голосов
/ 29 апреля 2011

Я видел множество примеров того, как использовать Observable.FromAsyncPattern () в Rx Framework для упрощения асинхронных вызовов, но я использую интерфейс, который не использует стандартный шаблон Async IAsyncResult BeginXXX / EndXXX (IAsyncResult), так что это не работает для меня.

Библиотека, с которой я работаю, предоставляет асинхронные функции с паттерном обратного вызова:

void GetAllObjects(Action<List<Object>> callback)

В идеальном мире я хотел бы превратить это:

var isLoadingUsers = true;
var isLoadingSystems = true;
var isLoadingCustomers = true;
var isLoadingRules = true;

mClient.GetAllUsers(UsersCallback);
mClient.GetAllCustomers(CustomersCallback);
mClient.GetAllRules(RulesCallback);

// set the IsLoadingXXX variables to false in callbacks
// once all are false

mClient.GetAllSystems(SystemsCallback);

примерно так:

var o = Observable.ForkJoin(
                     Observable.Start(GetAllUsers()),
                     Observable.Start(GetAllCustomers()),
                     Observable.Start(GetAllRules())
                    ).Finally(() => GetAllSystems);

Как можно превратить этот шаблон в нечто, что возвращает IObservable?

Ответы [ 3 ]

2 голосов
/ 29 апреля 2011
Func<IObservable<TRet>> FromListCallbackPattern(Action<Action<List<TRet>>> function)
{
    return () => {
        // We use a ReplaySubject so that if people subscribe *after* the
        // real method finishes, they'll still get all the items
        ret = new ReplaySubject<TRet>();

        function((list) => {
            // We're going to "rebroadcast" the list onto the Subject
            // This isn't the most Rx'iest way to do this, but it is the most
            // comprehensible :)
            foreach(var v in list) {
                ret.OnNext(v);
            }
            ret.OnCompleted();
        });

        return ret;
    };
}

Теперь вы можете сделать что-то вроде:

var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers);
getAllUsers().Subscribe(x => /* ... */);
1 голос
/ 30 апреля 2011

Мне нравится Observable.Create для этого, но ответ @dahlbyk неверен (пропускает завершение и выполняет действие в обработчике отмены подписки).Должно быть что-то вроде этого:

    IObservable<List<T>> FromListCallbackPattern<T>(
        Action<Action<List<T>>> listGetter)
    {
        return Observable
            .Create<List<T>>(observer =>
            {
                var subscribed = true;
                listGetter(list =>
                {
                    if (!subscribed) return;
                    observer.OnNext(list);
                    observer.OnCompleted();
                });
                return () =>
                {
                    subscribed = false;
                };
            });
    }

Кроме того, поскольку исходный API возвращает весь список целиком, я не вижу причины для его преобразования в наблюдаемый слишком рано.Пусть результирующая наблюдаемая также возвращает список, и если вызывающий объект должен сгладить его, он может использовать .SelectMany

1 голос
/ 29 апреля 2011

Попробуйте Observable.Create(), возможно, что-то вроде этого:

public IObservable<Object> ObserveAllObjects()
{
    return Observable.Create<Object>(
        observer =>
            () => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o))));
}
...