Проблема с определением этапа пользовательского раздела (Не удается дважды подключиться к порту) - PullRequest
0 голосов
/ 23 апреля 2019

Итак, у меня есть небольшая пользовательская сцена для разделения в Akka Streams.

object CustomPartitioner {

    /**
     * Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
     *
     * @param partitionF applies function, if true, route to B, otherwise route to C.
     *
     * @tparam A type of input
     * @tparam B type of output on the first outlet.
     * @tparam C type of output on the second outlet.
     *
     * @return A partition stage
     */
    def apply[A, B, C](partitionF: A => Either[B, C]) =
      new GraphStage[FanOutShape2[A, B, C]] {
        private val in: Inlet[A] = Inlet[A]("in")
        private val outB = Outlet[B]("outB")
        private val outC = Outlet[C]("outC")
        private val pendingB = MutableQueue.empty[B]
        private val pendingC = MutableQueue.empty[C]

        override def shape: FanOutShape2[A, B, C] = new FanOutShape2(in, outB, outC)
        override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with InHandler with OutHandler {

        setHandler(in, this)
        setHandler(outB, this)
        setHandler(outC, this)

        override def onPush(): Unit = {
          val elem = grab(in)
          partitionF(elem) match {
            case Left(b) =>
              pendingB.enqueue(b)
              tryPush(outB, pendingB, b)
            case Right(c) =>
              pendingC.enqueue(c)
              tryPush(outC, pendingC, c)
          }
        }

        override def onPull(): Unit = pull(in)

        private def tryPush[T](out: Outlet[T], pending: MutableQueue[T]): Unit =
          if (isAvailable(out) && pending.nonEmpty) push(out, pending.dequeue())
      }
  }

Я подключил это как разделитель к потоку, а затем слил его обратно в раковину.

Когда я пытаюсь протолкнуть сообщение через поток, используя компонентный тест

java.lang.IllegalArgumentException: Cannot pull port (in(256390569)) twice

, а затем проверка завершается неудачно с

java.lang.AssertionError: assertion failed: expected: expecting request() signal but got unexpected message CancelSubscription(PublisherProbeSubscription(akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$$anon$1@53c99b09,akka.testkit.TestProbe@2539cd1c))

Я почти уверен, что испортил вызовы setHandler, поскольку есть два из них для обработки как outB, так и outC. Однако я не знаю, как это исправить, чтобы вся эта система вызывала только onPush и onPull ровно один раз.

1 Ответ

0 голосов
/ 23 апреля 2019

Мне удалось заставить его работать

override def onPull(): Unit =
   if (!hasBeenPulled(in))
       pull(in)
...