Я пытаюсь ввести Rx в своем мобильном приложении Xamarin, и я хочу связать последовательность вызовов во время фазы входа в систему при запуске приложения.
TL; DR; Как запустить 2/3 наблюдаемых один за другим для правильного извлечения данных и настройки потоков.
Подробнее я выполняю следующее:
- Вход Пользователь и получить соответствующие данные
LoginUser() : IObservable<User>
- На основе типа пользователя идентификатор либо
RetrieveRemoteA() : IObservable<A>
или RetrieveRemoteB() : IObservable<B>
- Как только данные доступны (либо типа
A
или B
), Я обновляю пользовательский интерфейс.
Это своего рода диаграмма для описания потока (объяснено выше):
Учитывая, что я хочу избежать вызова новых наблюдаемых (источников) из Subscribe()
предыдущего, это то, что я реализовал для извлечения моих данных и обновления пользовательского интерфейса (поток в изображении был сериализован в коде ниже).
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
(user) =>
{
if (user.Type == UserType.A)
return RetrieveRemoteA(user.UserId); // outputs IObservable<A>
return Observable.Return(new A());
},
(user, a) =>
{
B b = null;
return new { user, a, b }; // Create anonymous type to keet track of 'user'
})
.SelectMany(
(xType) =>
{
if (xType.user.Type == UserType.B)
return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>
return Observable.Return(new B());
},
(xType, bData) =>
{
var user = xType.user;
var a = xType.a;
var b = b;
return new { user, a, b };
})
.ObserveOn(ImmediateScheduler.Instance)
.Select((xType) =>
{
if (xType.user.Type == UserType.A)
{
A a = xType.a;
B b = null;
return new { a, b };
}
else {
A a = null;
B b = xType.b;
return new { a, b };
}
})
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.a.Id}");
}
else {
Console.WriteLine($"ID: {result.b.Id}");
}
});
После запуска кажется, что поток суммируется с n RetrieveRemoteA(user.UserId)
, даже если метод завершается без ошибок.
public IObservable<A> RetrieveRemoteA(string userId)
{
return Observable.FromAsync<A>(async () =>
{
A a = await CustomAPI(userId)
return a;
}
}
Когда я реализую поток последовательно, вызывая каждую новую наблюдаемую часть в подписке предыдущего, он работает правильно (но это не правильный способ сделать это).
Я думаю, что это проблема с Threads или с моей неправильной реализацией Rx.
У вас есть какие-либо подсказки, пожалуйста?