FS2 Динамически создаваемые подписчики - PullRequest
1 голос
/ 09 октября 2019

Я использую шаблон pub-sub в fs2. Я динамически создаю темы и подписчиков при обработке потока сообщений. По какой-то причине мои подписчики получают только первоначальное сообщение, но дальнейшие опубликованные сообщения никогда не доходят до подписчиков

def startPublisher2[In](inputStream: Stream[F, Event]): Stream[F, Unit] = {
    inputStream.through(processingPipe)
  }

  val processingPipe: Pipe[F, Event, Unit] = { inputStream =>
    inputStream.flatMap {
      case message: Message[_] => initSubscriber(message)
        .flatMap { topic => Stream.eval(topic.publish1(message)) }
    }
  }

  def initSubscriber[In](message: Message[In]): Stream[F,Topic[F, Event]] = {

    Option(sessions.get(message.sessionId)) match {
      case None =>
        println(s"=== Create new topic for sessionId=${message.sessionId}")

        val topic = Topic[F, Event](message)
        sessions.put(message.sessionId, topic)
         Stream.eval(topic) flatMap  {t =>
           //TODO: Is there a better solution?
           Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)
         }

      case Some(topic) =>
        println(s"=== Existing topic for sessionId=${message.sessionId}")
        Stream.eval(topic)
    }
  }

Код подписчика прост:

def startSubscribers2(topic: Topic[F, Event]): Stream[F, Unit] = {
    def processEvent(): Pipe[F, Event, Unit] =
      _.flatMap {
        case e@Text(_) =>
          Stream.eval(F.delay(println(s"Subscriber processing event: $e")))
        case Message(content, sessionId) =>
          //Thread.sleep(2000)
          Stream.eval(F.delay(println(s"Subscriber #$sessionId got message: ${content}")))
        case Quit =>
          println("Quit")
          Stream.eval(interrupter.set(true))
      }

    topic.subscribe(10).through(processEvent())
  }

Вывод следующий:

=== Create new topic for sessionId=11111111-1111-1111-1111-111111111111
Subscriber #11111111-1111-1111-1111-111111111111 got message: 1
=== Existing topic for sessionId=11111111-1111-1111-1111-111111111111
=== Create new topic for sessionId=22222222-2222-2222-2222-222222222222
Subscriber #22222222-2222-2222-2222-222222222222 got message: 1
=== Create new topic for sessionId=33333333-3333-3333-3333-333333333333
Subscriber #33333333-3333-3333-3333-333333333333 got message: 1
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222

Я не вижу сообщений, опубликованных в существующей теме. Кроме того, мне интересно, есть ли лучший способ запустить асинхронный поток подписчиков вместо Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)

...