Почему я все еще получаю данные из весенней загрузки webflux, даже если я отписался? - PullRequest
0 голосов
/ 08 декабря 2018

Я изучаю реактивную сеть, для учебника я хотел получить результаты поиска в хештеге в твиттере из моего весеннего сервиса webflux для углового клиента 6.

При нажатии на localhost:8081/search/test в Chrome, Я получаю твиты в формате json реагирующим образом (твит от твита, а браузер показывает каждый приходящий).

так что для большего удовольствия я сделал небольшой угловой ввод для поиска, и я бы показал вконсольные твиты

проблема в том, что когда я ищу тег java, я получаю журналы консоли, тогда, если я попытаюсь найти тег spring, я буду регистрировать весенние твиты в консоли, а Java все ещеближайшие

Я провел некоторое исследование и обнаружил, что должен отписать моего потребителя от потока.

Я пытался реализовать это, но безуспешно

Вот что я пытался

Spring WebFlux Controller

private TwitHashLocalService localService;
    private TwitHashRemoteService remoteService;

    @GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE) 
    public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
        return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
    }

Мои услуги

local mongo db

private MongoService mongoService;

    public Flux<Tweet> findByTag(String tag) {

        return mongoService.findByTag(tag);
    }

удаленный твиттер поток flux

public Flux<Tweet> findByTag(String hashtag) throws TwitterException {

    return Flux.create(sink -> {
        TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
        twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
        twitterStream.onException(sink::error);
        twitterStream.filter(hashtag);
        sink.onCancel(twitterStream::shutdown);
    }); 

}

УГЛОВОЙ

Мой реактивный сервис поиска в Твиттере

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';


@Injectable({
    providedIn: 'root'
})
export class ReactiveTwitterService {

    myTweets: ITweet[] = new Array();

    tweetTag: string;

    baseUrl = 'http://localhost:8081/search';

    constructor(private http_client: HttpClient) { }

    getTweetStream(tag): Observable<Array<ITweet>> {

        this.myTweets = [];
        const url = this.baseUrl + '/' + tag;

        return Observable.create(observer => {
            const eventSource = new EventSource(url);
            eventSource.onmessage = (event) => {
                console.log('received event');
                const json = JSON.parse(event.data);
                console.log(json);
                console.log(json.tweetData.name, json.tweetData.text, json.tag);
                this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
                observer.next(this.myTweets);
            };
            eventSource.onerror = (error) => {
                // readyState === 0 (closed) means the remote source closed the connection,
                // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
                // is to insert a special element in the stream of events, which the client can identify as the last one.
                if (eventSource.readyState === 0) {
                    console.log('The stream has been closed by the server.');
                    eventSource.close();
                    observer.complete();
                } else {
                    observer.error('EventSource error: ' + error);
                }
            };
        });   
    }   
}

компонентстрока поиска

import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';

@Component({
    selector: 'app-serach-bar',
    templateUrl: './serach-bar.component.html',
    styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
    innerWidth: number;


    subscription: Subscription = new Subscription();
    placeholder = 'search';

    styleClass = {
        wide_screen: 'w3-input w3-light-grey',
        break_point: 'w3-input w3-white'
    };

    tweets: Observable<ITweet[]>;

    constructor(private twiterService: ReactiveTwitterService) { }

    doSearch(tag) {
        console.log('test' + tag);
        this.subscription.unsubscribe();
        this.tweets = this.twiterService.getTweetStream(tag);
        this.subscription.add(this.tweets.subscribe());
    }


    ngOnInit() {
    }

    @HostListener('window:resize', ['$event'])
    onResize(event) {
        this.innerWidth = window.innerWidth;
    }

    getStyle() {
        return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
    }
}

Как вы видите в поиске, я пытаюсь отписаться перед исследованием, но это не работает

Что я долженделать?

1 Ответ

0 голосов
/ 08 декабря 2018

1) Убедитесь, что отмена распространяется в RxJs

Из кода, который я вижу, на угловой стороне нет обработчика отмены.

Попробуйте выполнить следующее, чтобы отреагировать наотменить подписку на стороне JS:

Observable.create(observer => {
        const eventSource = new EventSource(url);

        // your code here 

        return () => eventSource.close(); // on cancel function
    }); 

2) Добавить .log() ing

Я бы порекомендовал добавить дополнительную регистрацию в трубу Реактора, чтобы было ясно, какие сигналы распространяются через трубу,Для этого используйте оператор .log().

3) Убедитесь, что EventSource действительно закрыт на стороне браузера.

Используйте консоль отладки вашего браузера для наблюдения за всеми открытыми соединениями с сервером.Убедитесь, что после изменения поискового запроса тега / очистки соединение закрывается

4) Помните о возможной согласованности

Все распространяющиеся события через реактивный канал являются асинхронными и неблокирующими, поэтомуможет быть некоторая задержка между фактическим действием и окончательной отменой на стороне сервера

...