Будет ли сервер отправлять события, заставляющие Netty держать тот же поток, пока поток не будет закрыт? - PullRequest
0 голосов
/ 01 апреля 2020

Контекст: HTML5 интерфейс вызовет службу, отвечающую на Flux. Целью использования Spring WebFlux с Netty является использование меньшего количества потоков и передача событий в одностороннем порядке с сервера на фронтэн. Под событиями я подразумеваю многочисленные изменения статуса до конца. Стек полностью реактивен: Angular9 / Rx JS -> Spring WebFlux / Netty -> springframework.data.mongodb.repository.ReactiveMongoRepository -> MongoDb. Насколько я понимаю, это действительно неблокирующий стек (см. Фрагмент кода ниже, я уверен, что я нигде не блокирую). Кроме того, вы можете видеть, что SSE действительно включен: производит = MediaType.TEXT_EVENT_STREAM_VALUE для службы отдыха и EventSource на фронте.

Основной вопрос: поскольку от первого состояния до последнего состояния может потребоваться от 10 секунд до 30 секунд, будет ли поток удерживаться в течение этого времени? Я считаю долгое время, принимая во внимание, что у нас есть Sensedia Api Gateway. Если это так, я бы начал задаваться вопросом, есть ли какая-то польза от использования сервера без блокировки (например, Netty) по сравнению с сервером блокировки (например, Tomcat). Отказ от ответственности: я говорю только об избежании создания потоков и блокировок в моем конкретном c сценарии. Я не сравниваю серверы в целом.

Загрузка микросервиса:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.config.EnableWebFlux;

@EnableWebFlux
@SpringBootApplication
public class FluxdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(FluxdemoApplication.class, args);
    }

}

Конечная точка контроллера SSE:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import com.reactive.fluxdemo.domain.Transfer;
import com.reactive.fluxdemo.repository.TransferRepository;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;

@RestController
public class TransferController {

    // Server Sent Events
    @GetMapping(value = "/stream/transfers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Transfer> streamAllTransfers() {
        return transferRepository.findAll();
    }

Домен:

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Transfer {
    @Id
    private String id;
...
    private Integer status;

Репозиторий:

import org.springframework.stereotype.Repository;
import com.reactive.fluxdemo.domain.*;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

@Repository
public interface TransferRepository extends ReactiveMongoRepository<Transfer, String> {

}

FrontEnd Любой из приведенных ниже: HTML5 SSE. Для этого вопроса не имеет значения чистый HTML5 или более сложный Наблюдатель. Кстати, я вставил ниже оба, чтобы проиллюстрировать, что Front открывает канал Server Sent Events.

Упрощенная версия с чистым HTML5

<div id="content"></div>
<script>
    var source = new EventSource();
    source.addEventListener('message', function (e) {
        console.log('New message is received');
        const index = JSON.parse(e.data);
        const content = `New event added: ${index.status}<br>`;
        document.getElementById("content").innerHTML += content;
    }, false);
</script>

Полная версия с Angular / Rx Js Observer

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';


@Injectable({
  providedIn: "root"
})
export class SseService {
  extratos: Extrato[] = [];
  constructor(private _zone: NgZone) { }

  getServerSentEvent(url: string): Observable<any> {
    this.extratos = [];
    return Observable.create(observer => {
      const eventSource = this.getEventSource(url);
      eventSource.onmessage = event => {
        this._zone.run(() => {
          let json = JSON.parse(event.data);
          this.extratos.push(new Extrato(json['id'], json['description'], json['value'], json['status']));
          observer.next(this.extratos);
        });
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      }

    });
  }
  private getEventSource(url: string): EventSource {
    return new EventSource(url);
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...