Это проблема синхронности.Observable.create
работает синхронно и создает наблюдаемую.Обратный вызов из вашего EventSource действительно вызывается, но уже слишком поздно, чтобы создать созданную наблюдаемую.
Решение состоит в том, чтобы использовать Subject
, который является членом класса обслуживания и будет сохраняться до тех пор, пока обратный вызов EventSource не сможет повлиятьЭто.Вам нужно будет «подключить» сервис к вашему компоненту, наблюдаемому в конструкторе компонента.Затем getAnswerStream()
становится triggerAnswerStream()
, который устанавливает события публикации событий в субъект.
Примерно так:
@Component({
selector: 'app-answers',
templateUrl: './answers.component.html',
providers: [AnswerReactiveService],
styleUrls: ['./answers.component.css']
})
export class AnswersComponent {
answers: Observable<Answer[]>;
constructor(private answerReactiveService: AnswerReactiveService) {
this.answers = answerReactiveService.answerStreamSubject.asObservable();
}
requestAnswerStream(): void {
this.answerReactiveService.getAnswerStream();
}
}
и
@Injectable()
export class AnswerReactiveService {
private answers: Answer[] = [];
private url: string = 'http://localhost:8080/events/1';
public answerStreamSubject: Subject<Array<Answer>>;
triggerAnswerStream(): void {
let url = this.url;
let eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
console.log('Received event: ', event);
const json = JSON.parse(event.data);
console.log(json);
this.answers.push(new Answer(json['answer']));
console.log(this.answers);
this.answerStreamSubject.next(this.answers);
};
eventSource.onerror = (error) => {
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
this.answerStreamSubject.complete();
} else {
this.answerStreamSubject.error('EventSource error: ' + error);
}
};
}
}