Функция очередей с использованием RxJS - PullRequest
0 голосов
/ 24 июня 2019

Я использую rxjs с NodeJS в бэкэнде. У меня есть Rest API, который позволяет пользователям запускать процесс удаленной установки пряжи. Функция install возвращает наблюдаемый процесс. Таким образом, когда модуль установлен успешно, он выдает значение в видимой и полной. На этом этапе Rest API вернет пользователю ответ об успешной установке. В случае сбоя установки процесс выдаст сообщение об ошибке в потоке, а Rest API вернет другой ответ с информацией об ошибке.

Моя проблема:

API вызывается потребителями несколько раз параллельно, поэтому в бэкэнде будут параллельные установки.

Я попытался оператор дроссельной заслонки создать очередь, но он поддерживает активным первый поток. Поэтому, если первый процесс «завершен», он возвращает «true», но поток не завершается

export class MyService {
    // the function called by the REST API
    installGlobal(moduleName: string): Observable < boolean > {
        // I think, there are something to do here to make it queuing
        return this.run('yarn', ['global', 'add', moduleName]);
    }

    private run(cmd: string, args: string[]): Observable < boolean > {
        const cmd$ = fromPromise(spawn(cmd, args)).pipe(
            map(stdout => {
                this.logger.info(`Install Module Successfully`);
                this.logger.info(`stdout: ${stdout.toString()}`);
                return true;
            }),
            catchError(error => {
                const errorMessage: string = error.stderr.toString();
                return _throw(errorMessage.substr(errorMessage.indexOf(' ') + 1));
            })
        );
        return cmd$;
    }
} 

Мои ожидания:

Либо существует несколько запросов, они должны быть поставлены в очередь. Таким образом, первый будет обработан, и все параллельные элементы должны быть поставлены в очередь. Когда первый обработан, он должен вернуть ответ потребителям API (например, завершено 200) и возобновить следующий поток из очереди.

[ОБНОВЛЕНИЕ-01 июля 2019 г.]: добавление примера

Демонстрацию кода можно получить по адресу stackblitz

Я переопределил существующий код, и я имитирую мой вызов API, подписавшись несколько раз на сервис, который вызовет очередь

1 Ответ

0 голосов
/ 25 июня 2019

Простой запрос в Rxjs можно выполнить, как показано ниже

const queque=new Subject()
// sequential processing
queue.pipe(concatMap(item=>yourObservableFunction(item)).subscribe()
// add stuff to the queue 
queque.next(item)
...