Я нашел несколько вопросов с одинаковым названием, и, насколько я мог видеть, некоторые из них предполагают, что решение в основном возвращает Observable вместо массива (другие относятся к FireBase, что не в моем случае). Что касается меня, то приведенный ниже код возвращает Observable (смотрите "getServerSentEvent (): Observable {return Observable.create ...")
Моя конечная цель - получить все события из потока. вернулся из Rest WebFlux. Я не упоминал ниже бэкэнда, потому что я почти уверен, что проблема связана с некоторой ошибкой в Angular.
Кроме того, я могу отлаживать и видеть, что события правильно поступают в extratos $ из приложения. .component.ts (см. изображение ниже).
Целые журналы
core.js:6185 ERROR Error: InvalidPipeArgument: '[object Object]' for pipe 'AsyncPipe'
at invalidPipeArgumentError (common.js:5743)
at AsyncPipe._selectStrategy (common.js:5920)
at AsyncPipe._subscribe (common.js:5901)
at AsyncPipe.transform (common.js:5879)
at Module.ɵɵpipeBind1 (core.js:36653)
at AppComponent_Template (app.component.html:8)
at executeTemplate (core.js:11949)
at refreshView (core.js:11796)
at refreshComponent (core.js:13229)
at refreshChildComponents (core.js:11527)
import { Component, OnInit } from '@angular/core';
import { AppService } from './app.service';
import { SseService } from './sse.service';
import { Extrato } from './extrato';
import { Observable } from 'rxjs';
selector: 'app-root',
templateUrl: './app.component.html',
providers: [SseService],
styleUrls: ['./app.component.css']
export class AppComponent implements OnInit {
//extratos: any;
extratos$ : Observable<any>;
constructor(private appService: AppService, private sseService: SseService) { }
ngOnInit() {
getExtratoStream(): void {
data => {
this.extratos$ = data;
import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';
providedIn: "root"
export class SseService {
extratos: Extrato[] = [];
constructor(private _zone: NgZone) { }
//getServerSentEvent(url: string): Observable<Array<Extrato>> {
getServerSentEvent(url: string): Observable<any> {
return Observable.create(observer => {
const eventSource = this.getEventSource(url);
eventSource.onmessage = event => {
this._zone.run(() => {
let json = JSON.parse(event.data);
this.extratos.push(new Extrato(json['id'], json['descricao'], json['valor']));
eventSource.onerror = (error) => {
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
} else {
observer.error('EventSource error: ' + error);
private getEventSource(url: string): EventSource {
return new EventSource(url);
app.component. html
<h1>Extrato Stream</h1>
<div *ngFor="let ext of extratos$ | async">
Свидетельство того, что наблюдаемые дополнительные данные заполнены
![enter image description here](https://i.stack.imgur.com/KCanY.png)