Сбой подписки на канал, содержащий TakeUntil - PullRequest
0 голосов
/ 21 октября 2019

Я пытаюсь написать плагин Grafana, используя RxJS. Каждая логика для передачи моего потока данных в Grafana работает и весь конвейер работает.

Теперь мне нужно остановить поток в случае изменения каких-либо параметров. Я нашел в разных постах блога и в документации совет по использованию takeUntil.

Поэтому я создал наблюдаемую передачу события, когда я хочу прервать поток. Но любая попытка подписаться на поток сейчас терпит неудачу:

Вы указали неверный объект, где ожидался поток. Вы можете предоставить Observable, Promise, Array или Iterable.

Эта проблема существует только при введении takeUntil. Это особенно странно, поскольку я использую TypeScript, и поэтому тип возвращаемых данных должен быть безопасным.

Для устранения проблемы я вставил пример из docs

/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));

. проблема сохраняется. Я проверил установленную версию rxjs, и пряжа решила ее до 6.5.3. Я проверил проблемы с GitHub для rxjs, но не нашел никаких связанных с этим проблем.

Я полагаю, что проблема заключается в моей среде, и окончательно я включил все свои зависимости:

 "dependencies": {
    "@grafana/toolkit": "next",
    "@grafana/ui": "latest",
    "@types/grafana": "github:CorpGlory/types-grafana.git",
    "@stomp/rx-stomp": "latest",
    "rxjs-tslint-rules": "^4.25.0"
    "sockjs-client": "^1.4.0",
    "ts-loader": "^6.2.0",
    "typescript": "^3.6.4"    
  }

Фрагментмоя собственная реализация

class LiveStreams {
 private streams;
 private stop$: Subject;

 constructor() {
   this.stop$ = new Subject<string>();
 }

 getStream(target): Observable<DataFrame[]> {
     const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
     stream = this.rxStomp.watch(target.streamName).pipe(
                finalize(() => {
                    delete this.streams[target.streamId];
                }),
                map((message: IMessage) => {
                    let content = JSON.parse(message.body).content;
                    appendResponseToBufferedData(content, data);
                    return [data];
                }),
                throttleTime(target.throttleTime),
                takeUntil(stop_id)
            );
       this.streams[target.streamId] = stream;
   } 
}

Любые подсказки, какие части моей среды для проверки будут оценены.

Ответы [ 3 ]

0 голосов
/ 21 октября 2019

Я не нашел реального ответа, но мне удалось обойти использование TakeUntil для Grafana.

Для всех, кто приезжает сюда для Grafana: Grafana автоматически отписывается, если запросы больше не совпадают. Поэтому просто необходимо удалить собственную кэшированную версию потока.

delete streams[streamId]
0 голосов
/ 24 октября 2019

Спасибо за ваш вопрос. Я не знаю, в чем причина ошибки. Я хотел бы иметь небольшой StackBlitz пример, чтобы иметь возможность отладить ваш пример, потому что код не так очевиден.

Мой совет для решения этой проблемы - больше перейти к реактивному подходу в этом решении. Вы можете создать наблюдаемый с вашими опциями и использовать оператор switchMap для решения вашего случая. Я думаю, что это будет намного более читабельным и обслуживаемым с точки зрения разработчика.

Я не знаю требований вашего класса. Поэтому, к сожалению, я не могу привести пример кода.

Удачи!

PS Пожалуйста, опубликуйте обновление, если вы уже решили эту проблему. Я хотел бы взглянуть на причину проблемы.

0 голосов
/ 21 октября 2019

Пожалуйста, попробуйте этот подход, он не завершен и не проверен, но должен дать вам некоторое указание:

  private streams: Observable<any>[];
  stopTriggers: Subject<boolean>[];

  ngOnInit() {
    this.getStream(123).subscribe(stream => {
      // console.log(stream);
    });
  }

   addStream(target) {
     this.stopTriggers[target.streamId] = new Subject<boolean>();
     const stream = of('something').pipe(
                // finalize(() => {
                //     delete this.streams[target.streamId];
                // }),
                // map((message: IMessage) => {
                //     let content = JSON.parse(message.body).content;
                //     appendResponseToBufferedData(content, data);
                //     return [data];
                // }),
                // throttleTime(target.throttleTime),
                takeUntil(this.stopTriggers[target.streamId])
            );
      this.streams[target.streamId] = stream;
   } 

   getStream(id): Observable<any> {
     return this.streams[id];
   }

   stopStream(id) {
     this.stopTriggers[id].next();
     this.stopTriggers[id].complete();
   }

   removeStream(id) {
     this.streams.splice(id,1);
   }
...