Rx Do tNet - Сцепление наблюдаемых с резьбой - PullRequest
1 голос
/ 07 апреля 2020

Я пытаюсь ввести Rx в своем мобильном приложении Xamarin, и я хочу связать последовательность вызовов во время фазы входа в систему при запуске приложения.

TL; DR; Как запустить 2/3 наблюдаемых один за другим для правильного извлечения данных и настройки потоков.

Подробнее я выполняю следующее:

  1. Вход Пользователь и получить соответствующие данные LoginUser() : IObservable<User>
  2. На основе типа пользователя идентификатор либо RetrieveRemoteA() : IObservable<A> или RetrieveRemoteB() : IObservable<B>
  3. Как только данные доступны (либо типа A или B), Я обновляю пользовательский интерфейс.

Это своего рода диаграмма для описания потока (объяснено выше):

enter image description here

Учитывая, что я хочу избежать вызова новых наблюдаемых (источников) из Subscribe() предыдущего, это то, что я реализовал для извлечения моих данных и обновления пользовательского интерфейса (поток в изображении был сериализован в коде ниже).

IObservable<User> loginUserObservable = LogInUser(currentUser);

loginUserObservable
    .SubscribeOn(ThreadPoolScheduler.Instance)
    .SelectMany(
        (user) =>
        {
            if (user.Type == UserType.A)
                return RetrieveRemoteA(user.UserId); // outputs IObservable<A>

            return Observable.Return(new A());
        },
        (user, a) =>
        {
            B b = null;
            return new { user, a, b }; // Create anonymous type to keet track of 'user'
        })
    .SelectMany(
        (xType) =>
        {
            if (xType.user.Type == UserType.B)
                return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>

            return Observable.Return(new B());
        },
        (xType, bData) =>
        {
            var user = xType.user;
            var a = xType.a;
            var b = b;

            return new { user, a, b };
        })
    .ObserveOn(ImmediateScheduler.Instance)
    .Select((xType) =>
    {
        if (xType.user.Type == UserType.A)
        {
            A a = xType.a;
            B b = null;
            return new { a, b };
        }
        else {
            A a = null;
            B b = xType.b;
            return new { a, b };
        }
    })
    .Subscribe((result) =>
    {
        if (result.a != null)
        {
            Console.WriteLine($"ID: {result.a.Id}");
        }
        else {
            Console.WriteLine($"ID: {result.b.Id}");
        }
    });

После запуска кажется, что поток суммируется с n RetrieveRemoteA(user.UserId), даже если метод завершается без ошибок.

public IObservable<A> RetrieveRemoteA(string userId)
{
    return Observable.FromAsync<A>(async () =>
    {
        A a = await CustomAPI(userId) 
        return a;
    }
}

Когда я реализую поток последовательно, вызывая каждую новую наблюдаемую часть в подписке предыдущего, он работает правильно (но это не правильный способ сделать это).

Я думаю, что это проблема с Threads или с моей неправильной реализацией Rx.

У вас есть какие-либо подсказки, пожалуйста?

Ответы [ 2 ]

1 голос
/ 13 апреля 2020

То, что вы хотите сделать, на самом деле довольно легко, используя оператор Observable.Amb. Его задача - разрешить запуск двух наблюдаемых, но как только одна из них возвращает значение, а затем игнорирует другую.

Вот как написать запрос:

var query =
    from user in loginUserObservable
    let a_observable = user.Type == UserType.A ? RetrieveRemoteA(user.UserId) : Observable.Never<A>()
    let b_observable = user.Type == UserType.B ? RetrieveRemoteB(user.UserId) : Observable.Never<B>()
    from ab in
        Observable
            .Amb(
                a_observable.Select(x => new { a = x, b = (B)null }),
                b_observable.Select(x => new { a = (A)null, b = x }))
    select ab;

Вот и все.

Учитывая эти примеры данных, вы можете сыграть:

var currentUser = "";

IObservable<User> LogInUser(string cu) => Observable.Start(() => new User() { Type = UserType.A, UserId = "Z1" });
IObservable<A> RetrieveRemoteA(string id) => Observable.Start(() => new A() { Id = "A.Z2" });
IObservable<B> RetrieveRemoteB(string id) => Observable.Start(() => new B() { Id = "B.Z3" });

IObservable<User> loginUserObservable = LogInUser(currentUser);

/* put `query` here */

query
    .Subscribe(x =>
        Console.WriteLine($@"ID: {(x.a != null ? x.a.Id : x.b.Id)}"));

В качестве альтернативы, вы можете просто полностью избежать оператора Amb, выполнив этот простой запрос:

var query =
    from user in loginUserObservable
    from ab in
        user.Type == UserType.A
            ? RetrieveRemoteA(user.UserId).Select(x => new { a = x, b = (B)null })
            : RetrieveRemoteB(user.UserId).Select(x => new { a = (A)null, b = x })
    select ab;
0 голосов
/ 07 апреля 2020

Я думаю, вы можете свернуть это в нечто вроде следующего. Reactive.Extensions могут обрабатывать асинхронные c методы как наблюдаемую единственную возвращаемость, поэтому нет необходимости заключать вызов asyn c в метод RetrieveRemoteA. Мы можем использовать это поведение в SelectMany, чтобы оно вызывало API, извлекало вашу информацию о пользователе и ожидало ответа, прежде чем оно сообщит дальше по цепочке.

Для ваших потоков я бы рекомендовал принять взгляните на добавление ReactiveUI в ваше приложение и использование их планировщиков . Например, ImmediateScheduler.Instance не гарантирует, что код, который вы выполняете в своей подписке, будет уведомлен в потоке пользовательского интерфейса.

IObservable<User> loginUserObservable = LogInUser(currentUser);

loginUserObservable
    .SubscribeOn(ThreadPoolScheduler.Instance)
    .SelectMany(
        async user =>
        {
            A userA = null;
            if (user.Type == UserType.A)
                userA = await CustomAPI(user.UserId).ConfigureAwait(false);

            B userB = null;
            if (user.Type == UserType.B)
                userB = await CustomAPI(user.UserId).ConfigureAwait(false);

            return new { UserA = userA, UserB = userB }
        })
    .ObserveOn(ImmediateScheduler.Instance)
    .Subscribe((result) =>
    {
        if (result.a != null)
        {
            Console.WriteLine($"ID: {result.UserA.Id}");
        }
        else {
            Console.WriteLine($"ID: {result.UserB.Id}");
        }
    });
...