Первый элемент иногда не включается во второй аргумент Flux.switchOnFirst? - PullRequest
0 голосов
/ 01 февраля 2020

Я пытаюсь использовать оператор SwitchOnFirst из реактора, что удивительно - за исключением того, что иногда трансформатор, передаваемый как второй аргумент BiFunction, кажется, не включает первый элемент. По сути, клиент отправляет на сервер 2 элемента свыше RSocket. Сервер программного кода выглядит следующим образом:

val socket = new AbstractRSocket() {

    override def requestChannel(payloads: Publisher[Payload]): Flux[Payload] =
      Flux.from(payloads).log.switchOnFirst((signal, all) => handle(signal.get(), all))

    private def handle(first: Payload, all: Flux[Payload]): Flux[Payload] =
      extractRoute(first) match {
        case Some("test.route") =>
          val source = Source.fromPublisher(all.log()).map(_.getDataUtf8)
          actorSink.runWith(source)
          return Flux.from(actorSource).map(DefaultPayload.create).runWith(Sink.asPublisher(false)))
      }

  }

При первом запуске клиента сервер получает оба элемента и публикует их в actorsink, как и ожидалось.

[2020-02-01 16:17:42,656] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:17:42,658] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | request(1) {}
[2020-02-01 16:17:42,664] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,731] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:17:42,736] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - request(16) {}
[2020-02-01 16:17:42,739] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,741] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [akka.actor.default-dispatcher-9] - | request(15) {}
[Sink] Received (item1)
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[Sink] Received (item2)
[2020-02-01 16:17:42,770] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onComplete() {}
[2020-02-01 16:17:42,771] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onComplete() {}
[Sink] Completed

Однако , если я остановлю клиента и запуском его снова, будет опубликован только второй элемент.

[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | request(1) {}
[2020-02-01 16:18:13,747] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@5a2d7823) {}
[2020-02-01 16:18:13,751] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - request(16) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [akka.actor.default-dispatcher-6] - | request(16) {}
[2020-02-01 16:18:13,787] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[2020-02-01 16:18:13,788] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [reactor-tcp-epoll-3] - onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[Sink] Received (item2)
[2020-02-01 16:18:13,790] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onComplete() {}

Разница в том, что [SwitchOnFirstInner.4] request (16) запускает [DoFinallyFuseable.3] request (16) вместо onNext с использованием первого элемента, уже доступного в операторе SwitchOnFirst.

Возможно, я что-то делаю не так, но не могу понять, что именно. В javado c для switchOnFirst указано, что издатель, полученный из исходного Flux, должен быть возвращен во всех случаях, что здесь не так (входные данные отправляются на ActorSink, а выходные данные поступают из отдельного ActorSource), может ли это быть проблемой?

Я новичок в реакторе / rsocket, поэтому извиняюсь, если я упускаю что-то очевидное.

1 Ответ

0 голосов
/ 03 февраля 2020

Не извлечение потока Flux из потока all действительно может быть причиной, особенно если actorSink.runWith(source) запрашивает источник all, потому что оператор switchOnFirst должен позаботиться о запросе.

...