Проблема с реактивными потоками - PullRequest
2 голосов
/ 20 января 2020
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
q.add("1");
q.add("2");
q.add("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();

webSocketClient.execute(new URI("wss://echo.websocket.org"), session -> session
        .send(Flux.just("INIT").map(session::textMessage))
        .thenMany(session
                .send(Flux.<String>generate(sink ->
                {
                    if (q.peek() != null)
                        sink.next(q.poll());
                    else
                        sink.complete();
                }).map(session::textMessage))
        )
        .thenMany(session
                .receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(s -> "Received: " + s)
                .log()
        )
        .then())
        .subscribe();

int i = 0;
while (true)
{
    String msg = "MSG #" + i++;
    q.add(msg);
    Thread.sleep(1000);
}

Вывод:

INFO reactor.Flux.Map.1 - onNext(Received: INIT)
INFO reactor.Flux.Map.1 - onNext(Received: 1)
INFO reactor.Flux.Map.1 - onNext(Received: 2)
INFO reactor.Flux.Map.1 - onNext(Received: 3)
INFO reactor.Flux.Map.1 - onNext(Received: MSG #0)

И тогда он останавливается. while (true) всегда заполняет очередь. Насколько я понимаю, способ, которым я использовал thenMany, должен был генерировать новый Flux с содержанием ConcurrentLinkedQueue каждый раз, когда предыдущий был помечен как complete(). Но это, похоже, не работает.

Редактировать: По сути, я хочу, чтобы отправлять данные в веб-сокет за пределами лямбда-области. Вот почему я создал очередь и использовал .thenMany(session.send(Flux.<String>generate..... Я ожидал, что она продолжит чтение из очереди, в то время как другие потоки добавляют в нее данные.

Ответы [ 2 ]

1 голос
/ 22 января 2020

Проблема в том, что вы комбинируете отправку и получение с thenMany методом на Mono и Flux. thenMany метод заставляет Flux игнорировать элемент из этого потока и реагировать только на сигнал завершения.

Итак, ничего не произойдет, если вы не вызовете sink.complete(). Но после вызова метода complete дальнейшие события отправляться не будут, даже если будет запрошено.

Отправка и получение должны выполняться независимо.

Также вместо ConcurrentLinkedQueue FluxProcessor и FluxSink могут быть использованы. EmitterProcessor может излучать нескольким подписчикам, сохраняя противодавление для каждого из своих подписчиков. Когда у него нет подписчика, он все еще может принять несколько потоков данных до настраиваемых bufferSize.

int bufferSize = 10;
FluxProcessor<String, String> processor =
    EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);

sink.next("1");
sink.next("2");
sink.next("3");

WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
    session -> {
      Flux<WebSocketMessage> out = Flux.just("INIT")
          .concatWith(processor)
          .map(session::textMessage);

      session.send(out)
          .subscribe(); //instead of thenMany

      return session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .map(s -> "Received: " + s)
          .log()
          .then();
    })
    .subscribe();

for (int i = 1; i <= 10; i++) {
  sink.next("MSG #" + i);
  TimeUnit.SECONDS.sleep(1);
}
sink.complete();

Журналы:

17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)
0 голосов
/ 20 января 2020

Как я вижу, вы ищете очередь, которая всегда генерирует поступающие данные. поэтому я предлагаю вам использовать процессор WebFlux и разделять отправку и получение в 2 разных потока;

    EmitterProcessor<String> e = EmitterProcessor.create();
    e.onNext("1");

    WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();



    webSocketClient.execute(new java.net.URI("wss://echo.websocket.org"), webSocketSession -> {
        return    Mono.zip( listenData(webSocketSession),
                sendSms(webSocketSession,e),(aVoid, aVoid2) -> aVoid);

    }).subscribe();



    int i = 0;
    while (true)
    {
        String msg = "MSG #" + i++;

        e.onNext(msg);
        Thread.sleep(1000);
    }


 private Mono<Void> sendSms(WebSocketSession webSocketSession,EmitterProcessor<String> e) {
   return webSocketSession.send(e.map(webSocketSession::textMessage).log());
}

private Mono<Void> listenData(WebSocketSession webSocketSession) {
    return webSocketSession
            .receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(s -> "Received: " + s)
            .log()
            .then();
}
...