Добавление каналов после подписки на услугу push-уведомлений - PullRequest
2 голосов
/ 07 июня 2019

Положение:
Я столкнулся с примером использования системы rxjs Observable, где мне может понадобиться добавить pipe d команд в Subscription после его запуска.

В моем случае приложение, над которым я работаю, должно пассивно прослушивать систему push-уведомлений. Через эту систему может быть выдвинуто несколько сообщений, на которые должна отвечать моя система. Однако , есть предсказуемый случай, когда динамически загружаемое представление, которое будет реализовано в будущем, должно будет добавить слушателя в систему push-уведомлений.

Вопрос:
Учитывая, что мое приложение находится в состоянии, в котором мой Subscription уже существует, могу ли я добавить дополнительный канал после вызова .subscribe(() => {})?

// this.something is an Observable<any>, for discussion purposes.
const subscription = this.something.subscribe(() => { // commands });

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

... И если я это сделаю, то что произойдет, если что?

Решение:
Оба ответа @ user2216584 и @SerejaBogolubov оба имели аспект ответа на этот вопрос.

Моему высокоуровневому сервису прослушивания push-уведомлений нужно было сделать две вещи:

  1. удерживайте подписку и
  2. Уметь рисовать из списка слушателей.

Сложность заключается в том, что каждый слушатель должен прослушивать разные сообщения. Иными словами, если я получаю сообщение на foo_DEV, приложение должно сделать что-то другое, чем если бы система push-уведомлений отправляет сообщение на bar_DEV.

Итак, вот что я придумала:

export interface PushNotificationListener {
  name: string,
  onMessageReceived: (msg: PushNotificationMessage) => any,
  messageSubject$: Subject<PushNotificationMessage>
}

export class PushNotificationListenerService {
  private connection$: Observable<PushNotificationConnection>;
  private subscription$: Subscription;

  private listeners: PushNotificationListener[] = [];

  constructor(
    private connectionManager: PushNotificationConnectionManager
  ) {
  }

  connect() {
    // Step 1 - Open the socket connection!
    this.connection$ = this.connectionManager.connect(
      // The arguments for setting up the websocket are unimportant here.
      // The underlying implementation is similarly unimportant.
    );
  } 

  setListener(
    name: string,
    onMessageReceived: (msg: PushNotificationMessage) => any
  ) {
    // Step 3...or maybe 2...(shrug)...
    // Set listeners that the subscription to the high-order connection
    // will employ.
    const newListener: PushNotificationListener = {
      name: name,
      onMessageReceived: onMessageReceived,
      messageSubject$: null
    };

    this.listeners.push(newListener);
  }

  listen() {
    // Step 2 - Listen for changes to the high-order connection observable.
    this.subscription$ = this.connection$
      .subscribe((connection: PushNotificationConnection) => {
        console.info('Push notification connection established');

        for (let listener of this.listeners) {
         listener.messageSubject$ = connection.subscribe(listener.name);
         listener.messageSubject$.subscribe((message: PushNotificationMessage) => {
           listener.onMessageReceived(message);
         }
        }
      },
      (error: any) => {
        console.warn('Push notification connection error', error);
      }
  }
}

Благодаря тщательному изучению внутреннего кода, который составляет ядро ​​моей системы push-уведомлений, я обнаружил, что у нас уже есть Observable более высокого порядка. Код веб-сокета создает наблюдаемую (connectionManager.connect()), которая должна быть кэширована в службе и подписана. Поскольку этот код относится к месту, где я работаю, я больше ничего не могу сказать о нем.

Однако кэширование слушателей также важно! Вызов subscribe в .listen() просто выполняет итерацию всех подключенных прослушивателей в любое время, когда соединение изменяет состояние, поэтому я могу мгновенно добавлять прослушиватели через .addListener(), и из-за того, что система rxjs Observable по сути работает, И тот факт, что я работаю из списка слушателей, находящихся в области действия, у меня есть система, с помощью которой я могу динамически устанавливать слушателей, даже если .connect() вызывается до того, как настроены какие-либо слушатели.

Этот код, вероятно, все еще может выиграть от редизайна / рефакторинга, но у меня есть кое-что, что работает, что является важным первым шагом любого хорошего кодирования. Спасибо всем!

Ответы [ 2 ]

3 голосов
/ 07 июня 2019

[Я редактирую свой ответ, потому что предыдущий ответ был в соответствии с самым первым кодом, предоставленным автором; Как уже упоминалось в комментарии, автор изменил / исправил код] -

Я сомневаюсь, что следующий код повлияет на что-либо в подписке -

this.something.pipe(
  map((something) => {
    // ...Commands that I want to add to the subscription...
  })
);

Вы можете попробовать функцию более высокого порядка при первоначальной настройке наблюдаемой, и, если функция более высокого порядка находится в области действия, вы можете переназначить ее. Я также сомневаюсь, что это будет работать из-за следующих причин -

  1. Когда Observable установлен, Observable сохраняет ссылку на переданную функцию, которая будет вызываться при подписке [https://medium.com/@benlesh/learning-observable-by-building-observable-d5da57405d87]. Теперь, если вы переназначаете функцию более высокого порядка, тогда наблюдаемая функция по-прежнему указывает на старую ссылка. Переназначая функцию более высокого порядка, вы не изменили исходную ссылку на функцию, которая была установлена ​​при первоначальной настройке наблюдаемой.

  2. Предположим, что по какой-то причине переназначение более высокого порядка работает. В этом случае также существует большая вероятность того, что перед выполнением старшей функции более высокого порядка вы могли бы переназначить функцию более высокого порядка (потому что, если исходная наблюдаемая делает Асинхронный вызов бэкэнда, пока ожидается код, цикл событий javascript мог переназначить функцию более высокого порядка, и когда асинхронный вызов возвращается, он выполнит новую назначенную функцию более высокого порядка). Может быть, этот код прояснит мою точку зрения -

let upperOrderFunc = map (x => x * 2);

this.something
    .pipe(
          mergeMap(_ => //call to backend; async call),
          higherOrderFunc,
         ).subscribe();
higherOrderFunc = map(x => x * 3); // this will execute before async call completes
2 голосов
/ 07 июня 2019

Ну, вы могли бы сделать это довольно легко.Скажем, вам нужна задержка во время выполнения map.Чем вы делаете что-то вроде map(this.myMapper), где myMapper - приватное поле, видимое в соответствующей области видимости.Изменяя это приватное поле, вы можете добавить / удалить дополнительные варианты поведения.Например, map(x => x) будет означать отсутствие какого-либо сопоставления.

Однако мне кажется, что вы злоупотребляете rxjs.Скорее всего, то, что вам действительно нужно, это наблюдаемый справа более высокого порядка (наблюдаемый, который испускает наблюдаемый «поток потоков»).Это было бы намного более rxjs ic и более чистое решение.Так что подумай дважды.

...