Контекст: HTML5 интерфейс вызовет службу, отвечающую на Flux. Целью использования Spring WebFlux с Netty является использование меньшего количества потоков и передача событий в одностороннем порядке с сервера на фронтэн. Под событиями я подразумеваю многочисленные изменения статуса до конца. Стек полностью реактивен: Angular9 / Rx JS -> Spring WebFlux / Netty -> springframework.data.mongodb.repository.ReactiveMongoRepository -> MongoDb. Насколько я понимаю, это действительно неблокирующий стек (см. Фрагмент кода ниже, я уверен, что я нигде не блокирую). Кроме того, вы можете видеть, что SSE действительно включен: производит = MediaType.TEXT_EVENT_STREAM_VALUE для службы отдыха и EventSource на фронте.
Основной вопрос: поскольку от первого состояния до последнего состояния может потребоваться от 10 секунд до 30 секунд, будет ли поток удерживаться в течение этого времени? Я считаю долгое время, принимая во внимание, что у нас есть Sensedia Api Gateway. Если это так, я бы начал задаваться вопросом, есть ли какая-то польза от использования сервера без блокировки (например, Netty) по сравнению с сервером блокировки (например, Tomcat). Отказ от ответственности: я говорю только об избежании создания потоков и блокировок в моем конкретном c сценарии. Я не сравниваю серверы в целом.
Загрузка микросервиса:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.config.EnableWebFlux;
@EnableWebFlux
@SpringBootApplication
public class FluxdemoApplication {
public static void main(String[] args) {
SpringApplication.run(FluxdemoApplication.class, args);
}
}
Конечная точка контроллера SSE:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import com.reactive.fluxdemo.domain.Transfer;
import com.reactive.fluxdemo.repository.TransferRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
@RestController
public class TransferController {
// Server Sent Events
@GetMapping(value = "/stream/transfers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Transfer> streamAllTransfers() {
return transferRepository.findAll();
}
Домен:
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class Transfer {
@Id
private String id;
...
private Integer status;
Репозиторий:
import org.springframework.stereotype.Repository;
import com.reactive.fluxdemo.domain.*;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
@Repository
public interface TransferRepository extends ReactiveMongoRepository<Transfer, String> {
}
FrontEnd Любой из приведенных ниже: HTML5 SSE. Для этого вопроса не имеет значения чистый HTML5 или более сложный Наблюдатель. Кстати, я вставил ниже оба, чтобы проиллюстрировать, что Front открывает канал Server Sent Events.
Упрощенная версия с чистым HTML5
<div id="content"></div>
<script>
var source = new EventSource();
source.addEventListener('message', function (e) {
console.log('New message is received');
const index = JSON.parse(e.data);
const content = `New event added: ${index.status}<br>`;
document.getElementById("content").innerHTML += content;
}, false);
</script>
Полная версия с Angular / Rx Js Observer
import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';
@Injectable({
providedIn: "root"
})
export class SseService {
extratos: Extrato[] = [];
constructor(private _zone: NgZone) { }
getServerSentEvent(url: string): Observable<any> {
this.extratos = [];
return Observable.create(observer => {
const eventSource = this.getEventSource(url);
eventSource.onmessage = event => {
this._zone.run(() => {
let json = JSON.parse(event.data);
this.extratos.push(new Extrato(json['id'], json['description'], json['value'], json['status']));
observer.next(this.extratos);
});
};
eventSource.onerror = (error) => {
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
}
});
}
private getEventSource(url: string): EventSource {
return new EventSource(url);
}
}