rxjs - наблюдаемый ничего не излучает (не завершает?) - PullRequest
0 голосов
/ 30 августа 2018

У меня есть приложение angular 6, и я изначально настроил свой бэкэнд с API отдыха, но я начинаю преобразовывать детали для использования socket.io.

Когда я возвращаю данные из моих остальных API, работает следующее:

this.http.get(api_url + '/versions/entity/' + entityId).pipe(
  mergeMap((versions:IVersion[]) => versions),
  groupBy((version:IVersion) => version.type),
  mergeMap(group => group.pipe(
    toArray(),
    map(versions=> {
      return {
        type: group.key,
        versions: versions
      }
    }),
    toArray()
  )),
  reduce((acc, v) => acc.concat(v), [])
);

Экспресс-маршрут:

router.get('/entity/:entityId', (req, res) => {
  const entityId = req.params.entityId;

  Version.getVersionsByEntity(entityId, (err, versions) => {
    if (err) {
      res.json({success: false, msg: err});
    } else {
      res.json(versions);
    }
  });
});

Который получает данные из моей базы данных mongo с помощью mongoose:

export function getVersionsByEntity(entityId, callback) {
  console.log('models/version - get versions by entity');

  Version.find({'entity.entityId': entityId})
          .exec(callback);
}

Однако, когда я делаю точно такой же вызов, но с socket.io, наблюдаемое ничего не возвращает. Я предполагаю, потому что это никогда не завершается? Отправляет ли вызов http сообщение «завершено» при успешной передаче данных?

Розетки отправляются с этого сервиса:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    // create observable to list to refreshJobs message
    let observable = new Observable(observer => {
      this._socketService.socket.on('versionsByEntity', (data) => {
        observer.next(data);
      });
    });

    this._socketService.event('versionsByEntity', entityId);

    return <Observable<IVersion[]>> observable;
  }

Который вызывает ту же функцию мангуста с сервера.

Наблюдаемое, возвращаемое службой (сокетом), на самом деле содержит данные, только когда я добавляю toArray() функции, она никогда ничего не печатает, когда я подписываюсь ..

Может ли кто-нибудь помочь мне исправить это, а также объяснить теорию, стоящую за этим? Разве это не завершение? Отправляет ли http сообщение «выполнено» с Angular?

РЕДАКТИРОВАТЬ: я создал простой стек стека, который делает то, что я хочу, но я хочу удалить take (1) из _dataService, так как я могу обновлять данные, поэтому я хочу оставить наблюдаемую открытой - https://stackblitz.com/edit/rxjs-toarray-problem

РЕДАКТИРОВАТЬ 2: Я близок, заменив toArray на оператор сканирования, но, похоже, он дважды генерирует массив. Reduce () выдает правильные данные, но, кажется, выдает только при полном (например, toArray), так что не лучше - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu

Ответы [ 2 ]

0 голосов
/ 31 августа 2018

В дополнение к тому, что сказал @ m1ch4ls, вы должны рассмотреть, как работает toArray. toArray преобразует все данные, заявленные Observable, в массив таких данных. Для того, чтобы это работало, Наблюдаемое должно быть завершено.

Observable, возвращаемый клиентом Angular http, завершается всегда после первого уведомления, и поэтому toArray работает. Поток socket.io завершается, когда он закрыт, поэтому использование toArray в таком потоке может привести к тому, что он никогда не получит никакого значения из Observable, если поток не закрыт.

С другой стороны, если вы хотите закрыть поток после одного уведомления, что происходит, если вы используете take(1), тогда вам лучше остаться с HTTP-запросами. Считается, что потоки сокетов являются чем-то вроде долгоживущих каналов, поэтому они не соответствуют их природе, если вы должны всегда закрывать их после передачи одного сообщения.

ОБНОВЛЕННАЯ ВЕРСИЯ после комментария

Вот код, который работает без take

 getData() {
    return this._dataService.getVersions().pipe(
      map(
        (versions: Array<any>) => versions.reduce(
          (acc, val) => {
            const versionGroup = (acc.find(el => el.type === val.type));
            if (versionGroup) {
              versionGroup.versions.push(val)
            } else {
              acc.push({type: val.type, versions: [val]})
            }
            return acc;
          }, []
        )
      ),
    )

Ключевым моментом для понимания является то, что ваш сервис возвращает вам Array вещей. Для достижения своего результата вы можете напрямую работать с этими Array методами Array, и вам не нужно использовать операторы Observable. Пожалуйста, не рассматривайте код логики группировки, который я использовал выше, как правильную реализацию - реальная реализация, вероятно, выиграет от использования таких вещей, как lodash для группировки, но я не хотел слишком усложнять вещи.

Это ваш оригинальный код

getData() {
    return this._dataService.getVersions().pipe(
      mergeMap((versions: Array<any>) => versions),
      groupBy((version:IVersion) => version.type),
      mergeMap(group => group.pipe(
        toArray(),
        map(versions=> {
          return {
            type: group.key,
            versions: versions
          }
        }),
        toArray()
      )),
      reduce((acc, v) => acc.concat(v), [])
    )
  }

Что это делает,

  1. вы создаете поток объектов, применяя первый mergeMap к Массив, возвращенный службой
  2. с помощью оператора groupBy вы создаете новый Observable, который испускает

  3. Объекты сгруппированы в соответствии с вашей логикой, затем вы вводите еще секунду комплекс mergeMap, который принимает массивы, испускаемые groupBy, превратить их в поток объектов, просто чтобы немедленно преобразовать их в массив снова через первый toArray, который затем получает преобразован в Объект типа {тип: строка, версии: []}, в затем, в конце концов, снова вызовите toArray, чтобы создать Array of Array

  4. Последнее, что вы делаете, это запускаете reduce, чтобы создать ваш окончательный массив

Почему работает только с take? Потому что groupBy выполняется только после завершения его исходного Observable, что имеет смысл, если вы хотите сгруппировать конечный набор вещей. Аналогично reduce Наблюдаемый оператор.

take - это способ дополнить источник Observable.

0 голосов
/ 30 августа 2018

Обработка потоков событий немного отличается. Я попытался предложить код ниже:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    return defer(() => {
        this._socketService.event('versionsByEntity', entityId);

        return fromEvent(this._socketService.socket, 'versionsByEntity')
            .pipe(take(1))
    }) as Observable<IVersion[]>;
}

Основная идея заключается в том, чтобы обернуть все в defer, что сделает Observable ленивым, и будет вызывать socketService.event только когда он подписан (ваша первоначальная реализация стремится сразу же вызвать socketService.event). Реализация может привести к непредвиденным последствиям - легко пропустить событие, если подписка на Observable подписана слишком поздно.

Я также предлагаю использовать fromEvent Наблюдаемая фабрика - которая обрабатывает настройку прослушивателя событий и уничтожает ее.

Наконец, чтобы завершить Наблюдение после первой эмиссии, я добавил take(1) - это ограничит количество выбросов до 1 и откажется от подписки на Наблюдение.

...