Я нашел несколько вопросов с одинаковым названием, и, насколько я мог видеть, некоторые из них предполагают, что решение в основном возвращает Observable вместо массива (другие относятся к FireBase, что не в моем случае). Что касается меня, то приведенный ниже код возвращает Observable (смотрите "getServerSentEvent (): Observable {return Observable.create ...")
Моя конечная цель - получить все события из потока. вернулся из Rest WebFlux. Я не упоминал ниже бэкэнда, потому что я почти уверен, что проблема связана с некоторой ошибкой в Angular.
Кроме того, я могу отлаживать и видеть, что события правильно поступают в extratos $ из приложения. .component.ts (см. изображение ниже).
Целые журналы
core.js:6185 ERROR Error: InvalidPipeArgument: '[object Object]' for pipe 'AsyncPipe'
at invalidPipeArgumentError (common.js:5743)
at AsyncPipe._selectStrategy (common.js:5920)
at AsyncPipe._subscribe (common.js:5901)
at AsyncPipe.transform (common.js:5879)
at Module.ɵɵpipeBind1 (core.js:36653)
at AppComponent_Template (app.component.html:8)
at executeTemplate (core.js:11949)
at refreshView (core.js:11796)
at refreshComponent (core.js:13229)
at refreshChildComponents (core.js:11527)
app.component.ts
import { Component, OnInit } from '@angular/core';
import { AppService } from './app.service';
import { SseService } from './sse.service';
import { Extrato } from './extrato';
import { Observable } from 'rxjs';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
providers: [SseService],
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
//extratos: any;
extratos$ : Observable<any>;
constructor(private appService: AppService, private sseService: SseService) { }
ngOnInit() {
this.getExtratoStream();
}
getExtratoStream(): void {
this.sseService
.getServerSentEvent("http://localhost:8080/extrato")
.subscribe(
data => {
this.extratos$ = data;
}
);
}
}
sse.service.ts
import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';
@Injectable({
providedIn: "root"
})
export class SseService {
extratos: Extrato[] = [];
constructor(private _zone: NgZone) { }
//getServerSentEvent(url: string): Observable<Array<Extrato>> {
getServerSentEvent(url: string): Observable<any> {
return Observable.create(observer => {
const eventSource = this.getEventSource(url);
eventSource.onmessage = event => {
this._zone.run(() => {
let json = JSON.parse(event.data);
this.extratos.push(new Extrato(json['id'], json['descricao'], json['valor']));
observer.next(this.extratos);
});
};
eventSource.onerror = (error) => {
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
}
});
}
private getEventSource(url: string): EventSource {
return new EventSource(url);
}
}
app.component. html
<h1>Extrato Stream</h1>
<div *ngFor="let ext of extratos$ | async">
<div>{{ext.descricao}}</div>
</div>
Свидетельство того, что наблюдаемые дополнительные данные заполнены