Я пытаюсь выполнить повторную попытку сети, используя оператор Rx JS repeatWhen
. Идея состоит в том, что, когда новый запрос поступает в планировщик, я пытаюсь выполнить запрос напрямую, и если он приводит к сбою сети, я добавляю его в пул, к которому можно будет повторить попытку позже. Таким образом, отправной точкой моего планировщика является функция очереди, которая выполняет такую работу:
queue({ operation, body }) {
const behaviourSubject = new BehaviorSubject();
const task = {
operation,
body,
behaviourSubject,
};
this.doTask(task).subscribe({
error: _ => this.notifier.next({ tasks: [task], remove: false }),
complete: () => console.log('Task successfully done on first try: ', task),
});
return behaviourSubject;
}
и this.notifier
- это Subject
, который используется в качестве уведомителя работника. Таким образом, сам рабочий выглядит так:
const rawWorker = new Observable(subscriber => {
const doneTasks = [];
const jobs = [];
for (const task of this.tasks) {
jobs.push(
this.doTask(task).pipe(
tap(_ => {
doneTasks.push(task);
}),
catchError(error => { task.behaviourSubject.next(error); return of(error); }),
)
);
}
if (jobs.length > 0) {
forkJoin(...jobs).subscribe(_ => {
this.notifier.next({ tasks: doneTasks, remove: true });
subscriber.next(`One cycle of worker done. #${doneTasks.length} task(s) done and #${this.tasks.length} remaining.`);
// subscriber.complete();
if (this.tasks.length > 0) {
this.notifier.next();
}
})
} else {
subscriber.complete();
}
});
const scheduledWorker = rawWorker.pipe( // TODO: delay should be added to retry and repeat routines
retry(),
repeatWhen(_ => this.notifier.pipe(
filter(_ => this.tasks.length > 0),
)),
);
, а также уведомитель отслеживает все отмененные запросы в массиве, например так:
this.notifierSubscription = this.notifier
.pipe(
filter(data => data && data.tasks)
)
.subscribe({
next: ({ tasks = [], remove = false }) => {
if (remove) {
console.log('removing tasks: ', tasks);
this.tasks = this.tasks.filter(task => !tasks.some(tsk => task === tsk));
} else {
console.log('inserting: ', tasks);
this.tasks.push.apply(
this.tasks,
tasks,
);
}
console.log('new tasks array: ', this.tasks);
}
});
Как я знаю, если цикл Рабочий не завершен, тогда повторюсь, когда делать нечего. Например, если я удаляю часть:
else {
subscriber.complete();
}
из рабочего, с первой попытки рабочего (пустой цикл) наблюдаемое не завершается и повторяется, когда ничего не будет сделано. Но с другой стороны, как вы видите, я прокомментировал // subscriber.complete();
, когда существуют рабочие места, но повторение происходит. И худшая часть проблемы заключается в том, что много разных экземпляров работника работают параллельно, что делает много повторяющихся запросов.
Я потратил много времени на эту проблему, но не имею никакой подсказки для отслеживания.