Я пытаюсь написать плагин Grafana, используя RxJS. Каждая логика для передачи моего потока данных в Grafana работает и весь конвейер работает.
Теперь мне нужно остановить поток в случае изменения каких-либо параметров. Я нашел в разных постах блога и в документации совет по использованию takeUntil
.
Поэтому я создал наблюдаемую передачу события, когда я хочу прервать поток. Но любая попытка подписаться на поток сейчас терпит неудачу:
Вы указали неверный объект, где ожидался поток. Вы можете предоставить Observable, Promise, Array или Iterable.
Эта проблема существует только при введении takeUntil. Это особенно странно, поскольку я использую TypeScript, и поэтому тип возвращаемых данных должен быть безопасным.
Для устранения проблемы я вставил пример из docs
/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
. проблема сохраняется. Я проверил установленную версию rxjs, и пряжа решила ее до 6.5.3. Я проверил проблемы с GitHub для rxjs, но не нашел никаких связанных с этим проблем.
Я полагаю, что проблема заключается в моей среде, и окончательно я включил все свои зависимости:
"dependencies": {
"@grafana/toolkit": "next",
"@grafana/ui": "latest",
"@types/grafana": "github:CorpGlory/types-grafana.git",
"@stomp/rx-stomp": "latest",
"rxjs-tslint-rules": "^4.25.0"
"sockjs-client": "^1.4.0",
"ts-loader": "^6.2.0",
"typescript": "^3.6.4"
}
Фрагментмоя собственная реализация
class LiveStreams {
private streams;
private stop$: Subject;
constructor() {
this.stop$ = new Subject<string>();
}
getStream(target): Observable<DataFrame[]> {
const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
stream = this.rxStomp.watch(target.streamName).pipe(
finalize(() => {
delete this.streams[target.streamId];
}),
map((message: IMessage) => {
let content = JSON.parse(message.body).content;
appendResponseToBufferedData(content, data);
return [data];
}),
throttleTime(target.throttleTime),
takeUntil(stop_id)
);
this.streams[target.streamId] = stream;
}
}
Любые подсказки, какие части моей среды для проверки будут оценены.