Как отслеживать количество подписок RXJS? - PullRequest
21 голосов
/ 18 мая 2019

Я использую Observable для предоставления интерфейса подписки на события для клиентов из глобального ресурса, и мне нужно управлять этим ресурсом в соответствии с количеством активных подписок:

  • Выделить глобальный ресурскогда количество подписок становится больше 0
  • Освободить глобальный ресурс, когда количество подписок становится 0
  • Отрегулируйте стратегию использования ресурсов на основе количества подписок

Как правильно в RXJS отслеживать количество активных подписок?


Как реализовать следующее в синтаксисе RXJS?-

const myEvent: Observable<any> = new Observable();

myEvent.onSubscription((newCount: number, prevCount: number) => {
   if(newCount === 0) {
      // release global resource
   } else {
      // allocate global resource, if not yet allocated
   }
   // for a scalable resource usage / load,
   // re-configure it, based on newCount
});

Я не ожидал бы гарантированного уведомления о каждом изменении, следовательно, newCount + prevCount params.

UPDATE-1

Это не дубликат этого , потому что мне нужно получать уведомления при изменении количества подписок, а не просто получать счетчик в какой-то момент.

ОБНОВЛЕНИЕ-2

Без какого-либо ответа, я быстро придумал очень уродливый и ограниченный обходной путь , через полную инкапсуляцию, и специально для типа Subject.Очень надеясь найти правильное решение.

ОБНОВЛЕНИЕ-3

После нескольких ответов я все еще не уверен, как реализовать то, что я пытаюсь,что является следующим:

class CustomType {

}

class CountedObservable<T> extends Observable<T> {

    private message: string; // random property

    public onCount; // magical Observable that needs to be implemented

    constructor(message: string) {
        // super(); ???
        this.message = message;
    }

    // random method
    public getMessage() {
        return this.message;
    }
}

const a = new CountedObservable<CustomType>('hello'); // can create directly

const msg = a.getMessage(); // can call methods

a.subscribe((data: CustomType) => {
    // handle subscriptions here;
});

// need that magic onCount implemented, so I can do this:
a.onCount.subscribe((newCount: number, prevCont: number) => {
    // manage some external resources
});

Как реализовать такой класс CountedObservable выше, который позволил бы мне подписаться на себя, а также на его свойство onCount для контроля количества его клиентов / подписок?

ОБНОВЛЕНИЕ-4

Все предложенные решения показались слишком сложными, и хотя я принял один из ответов, я получил полностью индивидуальное решение одномоего собственного .

Ответы [ 4 ]

5 голосов
/ 27 мая 2019

Вы действительно задаете здесь три отдельных вопроса, и я задаюсь вопросом, действительно ли вам нужны все возможности, о которых вы упомянули.Поскольку большая часть запрашиваемого управления ресурсами уже предоставлена ​​библиотекой, выполнение пользовательского кода отслеживания представляется излишним.Первые два вопроса:

  • Выделить глобальный ресурс, когда количество подписок станет больше 0
  • Освободить глобальный ресурс, когда количество подписок станет 0

Может быть сделано с помощью операторов using + share:

class ExpensiveResource {
  constructor () {
    // Do construction
  }
  unsubscribe () {
   // Do Tear down
  }
}

// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
  // Creates an expensive resource
  () => new ExpensiveResource(), 
  // Passes that expensive resource to an Observable factory function
  er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())

После этого sharedStream$ может передаваться как базовый поток, который будет управлять базовым ресурсом (при условии, что вы реализовали unsubscribe правильно), так что ресурс будет создан и разрушен при переходе числа подписчиков между 0 и 1.

  • Настройте стратегию использования ресурса в зависимости от количества подписок

    Третий вопрос, на который я больше всего сомневаюсь, но я отвечу на него для полноты, предполагая, что вы знаете свое приложение лучше, чем я (поскольку я не могу придумать причину, по которой вам понадобится конкретная обработка на разных уровнях использования,чем между 0 и 1).

В основном я бы использовал тот же подход, что и выше, но я будуd инкапсулируем логику перехода немного по-другому.

// Same as above
class ExpensiveResource {
  unsubscribe() {  console.log('Tear down this resource!')}
}

const usingReferenceTracking = 
  (onUp, onDown) => (resourceFactory, streamFactory) => {
    let instance, refCount = 0
    // Again manage the global resource state with using
    const r$ = using(
      // Unfortunately the using pattern doesn't let the resource escape the closure
      // so we need to cache it for ourselves to use later
      () => instance || (instance = resourceFactory()),
      // Forward stream creation as normal
      streamFactory
      )
    ).pipe(
      // Don't forget to clean up the stream after all is said and done
      // Because its behind a share this should only happen when all subscribers unsubscribe
      finalize(() => instance = null)
      share()
    )
    // Use defer to trigger "onSubscribe" side-effects
    // Note as well that these side-effects could be merged with the above for improved performance
    // But I prefer them separate for easier maintenance.
    return defer(() => onUp(instance, refCount += 1) || r$)
      // Use finalize to handle the "onFinish" side-effects
      .pipe(finalize(() => onDown(instance, refCount -= 1)))

}

const referenceTracked$ = usingReferenceTracking(
  (ref, count) => console.log('Ref count increased to ' + count),
  (ref, count) => console.log('Ref count decreased to ' + count)
)(
  () => new ExpensiveResource(),
  ref => timer(1000)
)

referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))


// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!

Предупреждение : Одним из побочных эффектов этого является то, что по определению поток будет теплым, как только он выйдет из функции usingReferenceTracking, и онбудет горячо при первой подписке.Обязательно учитывайте это на этапе подписки.

5 голосов
/ 21 мая 2019

Вы можете достичь этого, используя defer для отслеживания подписок и finalize для отслеживания завершений, например, в качестве оператора:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

// just a stub for `onCountUpdate`
function noop(){}

И затем используйте его следующим образом:

const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

Вот фрагмент кода, иллюстрирующий это:

const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>{
  source$.next('one');
}, 250);

setTimeout(()=>{
  source$.next('two');
}, 1000);

setTimeout(()=>{
  source$.next('three');
}, 1250);

setTimeout(()=>{
  source$.next('four');
}, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
  .subscribe(value => console.log('A', value));

setTimeout(()=>{
  result$.subscribe(value => console.log('B', value));
}, 500);


setTimeout(()=>{
  result$.subscribe(value => console.log('C', value));
}, 1000);

setTimeout(()=>{
  subscriptionA.unsubscribe();
}, 1500);


// complete source
setTimeout(()=>{
  source$.complete();
}, 2000);


function customOperator(onCountUpdate = noop) {
  return function refCountOperatorFunction(source$) {
    let counter = 0;

    return defer(()=>{
      counter++;
      onCountUpdate(counter);
      return source$;
    })
    .pipe(
      finalize(()=>{
        counter--;
        onCountUpdate(counter);
      })
    );
  };
}

function noop(){}
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

* ПРИМЕЧАНИЕ: если ваш источник $ холодный - вам может понадобиться поделиться этим.

Надеюсь, это поможет

3 голосов
/ 25 мая 2019

Какая забавная проблема! Если я понимаю, о чем вы спрашиваете, вот мое решение: создайте класс-оболочку для Observable, который отслеживает подписки, перехватывая как subscribe(), так и unsubscribe(). Вот класс оболочки:

export class CountSubsObservable<T> extends Observable<T>{
  private _subCount = 0;
  private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0);
  public subCount$ = this._subCount$.asObservable();

  constructor(public source: Observable<T>) {
    super();
  }

  subscribe(
    observerOrNext?: PartialObserver<T> | ((value: T) => void), 
    error?: (error: any) => void, 
    complete?: () => void 
  ): Subscription {
    this._subCount++;
    this._subCount$.next(this._subCount);
    let subscription = super.subscribe(observerOrNext as any, error, complete);
    const newUnsub: () => void = () => {
      if (this._subCount > 0) {
        this._subCount--;
        this._subCount$.next(this._subCount);
        subscription.unsubscribe();
      }
    }
    subscription.unsubscribe = newUnsub;
    return subscription;
  }
}

Эта оболочка создает вторичную наблюдаемую .subCount$, на которую можно подписаться, которая будет генерировать каждый раз, когда изменяется число подписок на исходную наблюдаемую. Он выдаст номер, соответствующий текущему количеству подписчиков.

Чтобы использовать его, вы должны создать наблюдаемый источник и затем вызвать new с этим классом для создания оболочки. Например:

const source$ = interval(1000).pipe(take(10));

const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$);

myEvent$.subCount$.subscribe(numSubs => {
  console.log('subCount$ notification! Number of subscriptions is now', numSubs);
  if(numSubs === 0) {
    // release global resource
  } else {
    // allocate global resource, if not yet allocated
  }
  // for a scalable resource usage / load,
  // re-configure it, based on numSubs
});

source$.subscribe(result => console.log('result is ', result));

Чтобы увидеть его в использовании, проверьте это Stackblitz .

UPDATE:

Хорошо, как уже упоминалось в комментариях, я немного пытаюсь понять, откуда поступает поток данных. Оглядываясь назад на ваш вопрос, я вижу, что вы предоставляете «интерфейс подписки на события». Если поток данных является потоком CustomType, как вы подробно описали в своем третьем обновлении выше, то вы можете использовать fromEvent() из rxjs для создания наблюдаемого источника, с помощью которого вы бы вызвали предоставленный мною класс-оболочку.

Чтобы показать это, я создал новый Stackblitz . Из этого Stackblitz вот поток CustomType s и как я бы использовал класс CountedObservable для достижения того, что вы ищете.

class CustomType {
  a: string;
}

const dataArray = [
  { a: 'January' },
  { a: 'February' },
  { a: 'March' },
  { a: 'April' },
  { a: 'May' },
  { a: 'June' },
  { a: 'July' },
  { a: 'August' },
  { a: 'September' },
  { a: 'October' },
  { a: 'November' },
  { a: 'December' }
] as CustomType[];

// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.  
const source$ = interval(2000).pipe(
  map(i => dataArray[i]), // transform the Observable stream into CustomTypes
  take(dataArray.length),  // limit the Observable to only emit # array elements
  share() // turn into a hot Observable.
);

const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$);

myEvent$.onCount.subscribe(newCount => {
  console.log('newCount notification! Number of subscriptions is now', newCount);
});

Надеюсь, это поможет.

1 голос
/ 28 мая 2019

Прежде всего, я очень ценю, сколько времени и усилий потратили люди, пытаясь ответить на мой вопрос! И я уверен, что эти ответы окажутся полезным руководством для других разработчиков, решающих аналогичные сценарии с помощью RXJS.

Однако, специально для того, что я пытался получить от RXJS, в конце концов я обнаружил, что мне лучше не использовать его вообще. Я специально хотел следующее:

Общий, простой в использовании интерфейс для подписки на уведомления, а также мониторинга подписок - все в одном. С RXJS лучшее, что я бы закончил, это некоторые обходные пути, которые кажутся излишне запутанными или даже загадочными для разработчиков, которые не являются экспертами в RXJS. Это не то, что я считаю дружественным интерфейсом, больше похоже на то, что звучит излишне инженерно.

В итоге у меня появился собственный, намного более простой интерфейс, который может делать все, что я искал:

export class Subscription {
    private unsub: () => void;

    constructor(unsub: () => void) {
        this.unsub = unsub;
    }

    public unsubscribe(): void {
        if (this.unsub) {
            this.unsub();
            this.unsub = null; // to prevent repeated calls
        }
    }
}

export class Observable<T = any> {
    protected subs: ((data: T) => void)[] = [];

    public subscribe(cb: (data: T) => void): Subscription {
        this.subs.push(cb);
        return new Subscription(this.createUnsub(cb));
    }

    public next(data: T): void {
        // we iterate through a safe clone, in case an un-subscribe occurs;
        // and since Node.js is the target, we are using process.nextTick:
        [...this.subs].forEach(cb => process.nextTick(() => cb(data)));
    }

    protected createUnsub(cb) {
        return () => {
            this.subs.splice(this.subs.indexOf(cb), 1);
        };
    }
}

export interface ISubCounts {
    newCount: number;
    prevCount: number;
}

export class CountedObservable<T = any> extends Observable<T> {    
    readonly onCount: Observable<ISubCounts> = new Observable();

    protected createUnsub(cb) {
        const s = this.subs;
        this.onCount.next({newCount: s.length, prevCount: s.length - 1});
        return () => {
            s.splice(s.indexOf(cb), 1);
            this.onCount.next({newCount: s.length, prevCount: s.length + 1});
        };
    }
}

Он небольшой и элегантный, и позволяет мне делать все, что мне нужно для начала, безопасным и дружелюбным способом. Я могу сделать то же самое subscribe и onCount.subscribe, и получить все те же уведомления:

const a = new CountedObservable<string>();

const countSub = a.onCount.subscribe(({newCount, prevCount}) => {
    console.log('COUNTS:', newCount, prevCount);
});

const sub1 = a.subscribe(data => {
    console.log('SUB-1:', data);
});

const sub2 = a.subscribe(data => {
    console.log('SUB-2:', data);
});

a.next('hello');

sub1.unsubscribe();
sub2.unsubscribe();
countSub.unsubscribe();

Надеюсь, это поможет и кому-то еще.

P.S. Я также улучшил его как независимый модуль .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...