Одной из идей для потоковой передачи данных постоянному действующему субъекту с противодавлением является использование Sink.actorRefWithAck
: разрешить субъекту отправлять подтверждающее сообщение, когда оно сохранило сообщение.Это будет выглядеть примерно так:
// ...
case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])
// ...
def receive = {
case WriteSinkRequest(userId) =>
val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
// as well as the messages used in persistentSink
val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
persistentActor,
/* additional parameters: see the docs */
)
val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
// ...
}
В приведенном выше примере используется класс пользовательского случая с именем MyMsg
вместо ByteString
.
В отправителе, предполагая, что это актер:
def receive = {
case WriteSinkReady(userId, sinkRef) =>
source.runWith(sinkRef) // source is a Source[MyMsg, _]
// ...
}
Материализованный поток в отправителе будет отправлять сообщения постоянному субъекту.