Как сохранить потоковые данные с помощью Akka Persistence - PullRequest
0 голосов
/ 08 мая 2018

Я использую StreamRefs для установления потоковых соединений между участниками в кластере. В настоящее время в узле записи я сохраняю входящие сообщения в файл журнала вручную, но мне интересно, можно ли заменить его постоянным Sink для записи и постоянным Source для чтения при запуске актера из журнала «Акка персистентность». Я думал о замене приемника файла журнала на persist { evt => ... } Постоянного актера, но так как он выполняется асинхронно, я потеряю противодавление. Так можно ли записывать потоковые данные с обратным давлением в журнал Akka Persistence и считывать эти данные потоковым способом при восстановлении субъекта?

Текущая реализация:

object Writer {
  case class WriteSinkRequest(userId: String) 
  case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
  case class ReadSourceRequest(userId: String)
  case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}

class Writer extends Actor {

    // code omitted

    val logsDir = "logs"

    val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))

    def logFile(id: String) = {
        path.resolve(id)
    }

    def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
    def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))

    override def receive: Receive = {
        case WriteSinkRequest(userId) => 
            // obtain the source you want to offer:
            val sink = logFileSink(userId)
            // materialize the SinkRef (the remote is like a source of data for us):
            val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
            // wrap the SinkRef in some domain message, such that the sender knows what source it is
            val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
            // reply to sender
            reply.pipeTo(sender())

        case ReadSourceRequest(userId) =>
            val source = logFileSource(userId)
            val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
            val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
            reply pipeTo sender()

    }
}

P.S. Можно ли создать не приемник "save-to-journal", а поток: incoming data to write ~> save to persistence journal ~> data that was written

1 Ответ

0 голосов
/ 08 мая 2018

Одной из идей для потоковой передачи данных постоянному действующему субъекту с противодавлением является использование Sink.actorRefWithAck: разрешить субъекту отправлять подтверждающее сообщение, когда оно сохранило сообщение.Это будет выглядеть примерно так:

// ...
case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])    
// ...

def receive = {
  case WriteSinkRequest(userId) =>
    val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
                                        // as well as the messages used in persistentSink

    val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
      persistentActor,
      /* additional parameters: see the docs */
    )

    val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
    val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
    reply.pipeTo(sender())

  case ReadSourceRequest(userId) =>
    // ...
}

В приведенном выше примере используется класс пользовательского случая с именем MyMsg вместо ByteString.

В отправителе, предполагая, что это актер:

def receive = {
  case WriteSinkReady(userId, sinkRef) =>
    source.runWith(sinkRef) // source is a Source[MyMsg, _]

  // ...
}

Материализованный поток в отправителе будет отправлять сообщения постоянному субъекту.

...