Прежде всего, я не очень знаком с Observables, поэтому я должен попросить вас, ребята, внести ясность в мою голову. Я работаю над веб-приложением, в котором есть Angular клиент и сервер Rails. Для каждого запроса от клиента к серверу я хотел бы создать задачу и добавить ее в список задач. Это мой класс ServerTask:
export class ServerTask {
private _state$ = new BehaviorSubject<ServerTaskState>({type:"pending"});
readonly state$ = this._state$.asObservable();
readonly createdAt: number;
readonly id: string;
constructor(readonly description: string) {
this.id = generateUUIDv4();
this.createdAt = Date.now();
}
get state() {
return this._state$.value;
}
public succeeded() {
this._state$.next({type: "success"});
this._state$.complete();
}
public failed(message: string) {
this._state$.next({type: "failure", message});
this._state$.complete();
}
}
Итак, в моем servertask.service.ts есть субъект, который принимает все созданные задачи:
private readonly _newTaskEvent$ = new Subject<ServerTask>();
Для буферизации фиксированного количества задач Я использую scan:
private readonly _internalTasks$ : Observable<ServerTask[]>= this._newTaskEvent$.pipe(
scan((acc, cur) => [...acc, cur].slice(-10), []),
startWith([]),
debounceTime(0),
shareReplay(1)
);
Оттуда я хотел бы получить список всех задач, которые имеют, например, состояние $ "в ожидании". Поэтому я создал функцию для фильтрации задач по состоянию:
private getTasks(state: ServerTaskStateType,cmap:(value:ServerTask)=>any): Observable<PublicServerTask[]> {
return this._internalTasks$.pipe(
mergeMap(async (tasks)=> {
return tasks
.filter(async (t) => {
const promise = t.state$.pipe(first()).toPromise();
const resolved = await promise;
return resolved.type === state;})
.map(cmap)}),
)
}
, и вот как я пытаюсь ее использовать:
readonly pendingTasks$: Observable<PendingServerTask[]> = this.getTasks("pending",(t)=>(
{description: t.description,}
));
readonly succeededTasks$: Observable<SucceededServerTask[]> = this.getTasks("success",(t)=>(
{description: t.description,}
));
readonly failedTasks$: Observable<FailedServerTask[]> = this.getTasks("failure",(t)=>(
{description: t.description,message:t.state.message}
));
Когда одна задача T1 создается и отправляется в newTaskEvent $, pendingTasks$.pipe(first()).toPromise()
содержит T1. Затем вызывается функция t1.succeeded (), а затем succeededTasks$.pipe(first()).toPromise()
также содержит T1. Так что все правильно! Проблема заключается в том, что pendingTasks$.pipe(first()).toPromise()
по-прежнему содержит t1, который теперь должен быть доступен только в последующих задачах $.
Я понятия не имею, как решить эту проблему или возможно ли с этой структурой в моем сервисе. Спасибо вам, ребята, за любую помощь!
РЕДАКТИРОВАТЬ:
Вот тест, как должен вести себя конвейер:
function firstPromise<T>(obs: Observable<T>) {
return obs.pipe(first()).toPromise();
}
it("Single pending task later succeeds", async () => {
const t = instantiate();
// Create and add Task to _newTaskEvent$
const task = mkTaskPending("t1");
t.service.addTask(task);
// Pending first
const pending = await firstPromise(t.service.pendingTasks$);
expect(pending.map((t) => t.description)).toEqual(["t1"]);
const succeeded = await firstPromise(t.service.succeededTasks$);
expect(succeeded.map((t) => t.description)).toEqual([]);
// Changed to succeed
task.succeeded();
// Now succeeded !!--Error: Timeout - Async function did not complete--!!
const succeededLater = await firstPromise(t.service.succeededTasks$);
expect(succeededLater.map((t) => t.description)).toEqual(["t1"]);
// Nothing remains pending
const pendingLater = await firstPromise(t.service.pendingTasks$);
expect(pendingLater.map((t) => t.description)).toEqual([]);
});