Я использую шаблон pub-sub в fs2. Я динамически создаю темы и подписчиков при обработке потока сообщений. По какой-то причине мои подписчики получают только первоначальное сообщение, но дальнейшие опубликованные сообщения никогда не доходят до подписчиков
def startPublisher2[In](inputStream: Stream[F, Event]): Stream[F, Unit] = {
inputStream.through(processingPipe)
}
val processingPipe: Pipe[F, Event, Unit] = { inputStream =>
inputStream.flatMap {
case message: Message[_] => initSubscriber(message)
.flatMap { topic => Stream.eval(topic.publish1(message)) }
}
}
def initSubscriber[In](message: Message[In]): Stream[F,Topic[F, Event]] = {
Option(sessions.get(message.sessionId)) match {
case None =>
println(s"=== Create new topic for sessionId=${message.sessionId}")
val topic = Topic[F, Event](message)
sessions.put(message.sessionId, topic)
Stream.eval(topic) flatMap {t =>
//TODO: Is there a better solution?
Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)
}
case Some(topic) =>
println(s"=== Existing topic for sessionId=${message.sessionId}")
Stream.eval(topic)
}
}
Код подписчика прост:
def startSubscribers2(topic: Topic[F, Event]): Stream[F, Unit] = {
def processEvent(): Pipe[F, Event, Unit] =
_.flatMap {
case e@Text(_) =>
Stream.eval(F.delay(println(s"Subscriber processing event: $e")))
case Message(content, sessionId) =>
//Thread.sleep(2000)
Stream.eval(F.delay(println(s"Subscriber #$sessionId got message: ${content}")))
case Quit =>
println("Quit")
Stream.eval(interrupter.set(true))
}
topic.subscribe(10).through(processEvent())
}
Вывод следующий:
=== Create new topic for sessionId=11111111-1111-1111-1111-111111111111
Subscriber #11111111-1111-1111-1111-111111111111 got message: 1
=== Existing topic for sessionId=11111111-1111-1111-1111-111111111111
=== Create new topic for sessionId=22222222-2222-2222-2222-222222222222
Subscriber #22222222-2222-2222-2222-222222222222 got message: 1
=== Create new topic for sessionId=33333333-3333-3333-3333-333333333333
Subscriber #33333333-3333-3333-3333-333333333333 got message: 1
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
Я не вижу сообщений, опубликованных в существующей теме. Кроме того, мне интересно, есть ли лучший способ запустить асинхронный поток подписчиков вместо Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)