Rxjs in nestjs - Наблюдаемая ошибка подписки - PullRequest
0 голосов
/ 22 февраля 2019

Я новичок, пробующий rxjs и nestjs.Вариант использования, который я сейчас пытаюсь выполнить, предназначен для образовательных целей.Поэтому я хотел прочитать файл json (выдать наблюдаемую ошибку в случае, если файл пуст или не может быть прочитан) с помощью модуля "fs".Теперь я создаю наблюдаемый, асинхронно читая файл, устанавливаю наблюдателя в теме, а затем подписываюсь на субъект в контроллере .Вот мой код в сервисе

@Injectable()
export class NewProviderService {
    private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
    // this is the variable that should be exposed. make the subject as private
    // this allows the service to be the sole propertier to modify the stream and
    // not the controller or components
    serviceSubject$:  Observable<HttpResponseModel[]>;

    private serviceErrorSubject: BehaviorSubject<any>;
    serviceErrorSubject$: Observable<any>;
    filePath: string;
    httpResponseObjectArray: HttpResponseModel[];
    constructor() {
        this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
        this.serviceSubject$ = this.serviceSubject.asObservable();

        this.serviceErrorSubject = new BehaviorSubject<any>(null);
        this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();

        this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
    }
     readFileFromJson() {
          return new Promise((resolve, reject) => {
            fs.exists(this.filePath.toString(), exists => {
                if (exists) {
                    fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
                        if (err) {
                            logger.info('error in reading file', err);
                            return reject('Error in reading the file' + err.message);
                        }

                        logger.info('file read without parsing fg', data.length);
                        if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
                            // this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
                            // logger.info('array obj is:', this.httpResponseObjectArray);
                            logger.info('file read after parsing new', JSON.parse(data));
                            return resolve(JSON.parse(data).HttpTestResponse);
                        } else {
                            return reject(new FileExceptionHandler('no data in file'));
                        }
                    });
                } else {
                    return reject(new FileExceptionHandler('file cannot be read at the moment'));
                }
            });
          });
    }


        getData() {
                 from(this.readFileFromJson()).pipe(map(data => {
                        logger.info('data in obs', data);
                        this.httpResponseObjectArray = data as HttpResponseModel[];
                        return this.httpResponseObjectArray;
                 }), catchError(error => {
                     return Observable.throw(error);
                 }))
                 .subscribe(actualData => {
                    this.serviceSubject.next(actualData);
                }, err => {
                    logger.info('err in sub', typeof err, err);
                    this.serviceErrorSubject.next(err);
                });
            }

Теперь это класс контроллеров

@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {

  await this.newService.getData();
  this.newService.serviceSubject$.subscribe(data => {
    logger.info('data subscribed', data, _.isEmpty(data));
    if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
      logger.info('coming in');
      res.status(HttpStatus.OK).send(data);
      res.end();
    }
  });
}

Проблема, с которой я сталкиваюсь, заключается в том, что я могу получить информацию о файле в первый раз и подпискувызывается один раз> работает нормально.При последующих запросах

Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
    at ServerResponse.setHeader (_http_outgoing.js:470:11)
    at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
    at Ser

и конечной точке / getJsonData возникает ошибка.Может ли кто-нибудь помочь мне.Я полагаю, что после первого звонка подписка не работает должным образом, но я не уверен, как это закончить и как решить эту проблему

Ответы [ 3 ]

0 голосов
/ 22 февраля 2019

Я думаю, что попытка преобразовать наблюдаемую в холодном состоянии (созданную мною) в наблюдаемую в горячем / теплом виде может помочь подключиться к одному источнику, выполнить и завершить его выполнение и сохранить последние переданные данные в любых клонированных значениях.Поэтому я делаю холодную наблюдаемую теплой наблюдаемой, используя операторы publishLast (), refCount (), и я могу добиться единой подписки и завершения выполнения наблюдаемой.Вот изменения, которые я сделал для работы.

Это изменение класса обслуживания, которое я сделал

 getData() {
        return from(this.readFileFromJson()).pipe(map(data => {
                logger.info('data in obs', data);
                this.httpResponseObjectArray = data as HttpResponseModel[];
                return this.httpResponseObjectArray;
         }), publishLast(), refCount()
          , catchError(error => {
             return Observable.throw(error);
         }));
        //  .subscribe(actualData => {
        //     this.serviceSubject.next(actualData);
        // }, err => {
        //     logger.info('err in sub', typeof err, err);
        //     this.serviceErrorSubject.next(err);
        // });
    }

И это изменение, которое я сделал в контроллере

public async getJsonData(@Req() req, @Res() res) {
    let jsonData: HttpResponseModel[];
    await this.newService.getData().subscribe(data => {
      logger.info('dddd', data);
      res.send(data);
    });
}

Любые ответы, которые позволяют наблюдаемым сначала подписаться на темы, а затем подписывать эту тему в контроллере, также приветствуются.

Я нашел отличный пост о горячих и холодных наблюдаемых и о том, как заставить наблюдаемые подписаться наодин источник и преобразование холода в горячую / теплую наблюдаемую - https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

0 голосов
/ 23 февраля 2019

Проблема в том, что вы подписываетесь на ваш serviceSubject в вашем контроллере.Каждый раз, когда выдается новое значение, он пытается отправить ответ.Это работает в первый раз, но во второй раз он скажет вам, что не может отправить тот же ответ снова;запрос уже обработан.

Вы можете использовать оператор pipeable first() для завершения Observable после первого значения:

@Get('/getJsonData')
public async getJsonData() {
  await this.newService.getData();
  return this.newService.serviceSubject$.pipe(first())
}

Вы хотите, чтобы ваш Observableбыть разделенным (горячим), чтобы каждый подписчик всегда получал одно и то же последнее значение.Это именно то, что делает BehaviourSubject.Поэтому вам не следует конвертировать Subject в Observable при публичном раскрытии, поскольку вы потеряете это желаемое поведение.Вместо этого вы можете просто привести ваш Subject к Observable, чтобы внутренне он все еще был субъектом, но он не предоставил бы метод next() для публичного генерирования новых значений:

private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
   return this.serviceSubject;
}
0 голосов
/ 22 февраля 2019

Я бы рекомендовал вернуть Promise непосредственно на контроллер.Здесь вам не нужно Observable.Для подписчиков вы дополнительно выдаете значение Promise на serviceSubject.

async getData() {
  try {
    const data = await this.readFileFromJson();
    this.serviceSubject.next(data as HttpResponseModel[]);
    return data;
  } catch (error) {
    // handle error
  }
}

. В вашем контроллере вы можете просто вернуть Promise:

@Get('/getJsonData')
public async getJsonData() {
  return this.newService.getData();
}
...