Копирование сообщений в персистентный типизированный субъект 2.5.17. - PullRequest
0 голосов
/ 21 сентября 2018

У меня есть актер со следующим поведением.Когда я прячу команды, они никогда не возвращаются в почтовый ящик.Используйте версии "akka-actor-typed" и "akka-persistence-typed" 2.5.17

object Blog {
  //#event
  sealed trait BlogEvent                                           extends Serializable
  final case class PostAdded(postId: String, content: PostContent) extends BlogEvent
  //#event

  //#state
  object BlogState {
    val empty = BlogState(None)
  }

  final case class BlogState(content: Option[PostContent]) {
    def withContent(newContent: PostContent): BlogState =
      copy(content = Some(newContent))
    def isEmpty: Boolean = content.isEmpty
    def postId: String = content match {
      case Some(c) ⇒ c.postId
      case None    ⇒ throw new IllegalStateException("postId unknown before post is created")
    }
  }
  //#state

  //#commands
  sealed trait BlogCommand                                                       extends Serializable
  final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
  final case class Accumulate(num: Int)                                          extends BlogCommand
  final case class AddPostDone(postId: String)
  final case class PostContent(postId: String, title: String, body: String)
  //#commands

  def behavior: Behavior[BlogCommand] =
    Behaviors.setup[BlogCommand] { ctx ⇒
      val buffer = StashBuffer[BlogCommand](capacity = 1000)

      //#initial-command-handler
      def initial(cmd: BlogCommand): Effect[BlogEvent, BlogState] =
        cmd match {
          case AddPost(content, replyTo) ⇒
            Effect
              .persist(PostAdded(content.postId, content))
              .thenRun { _ ⇒
                replyTo ! AddPostDone(content.postId)
                buffer.unstashAll(ctx, this.behavior)
              }
          case a @ Accumulate(_) =>
            buffer.stash(a)
            Effect.none
          case _ ⇒
            Effect.unhandled
        }
      //#initial-command-handler

      //#post-added-command-handler
      def postAdded(cmd: BlogCommand): Effect[BlogEvent, BlogState] = {
        cmd match {
          case _: AddPost ⇒
            Effect.unhandled
          case Accumulate(num) ⇒
            println(s"********Acumulate: $num**********")
            Effect.none
          case c ⇒
            Effect.unhandled
        }
      }
      //#post-added-command-handler

      //#by-state-command-handler
      val commandHandler: CommandHandler[BlogCommand, BlogEvent, BlogState] =
        (state, cmd) =>
          state match {
            case state if state.isEmpty ⇒ initial(cmd)
            case state if !state.isEmpty ⇒
              postAdded(cmd)
        }
      //#by-state-command-handler

      //#event-handler
      val eventHandler: (BlogState, BlogEvent) ⇒ BlogState = { (state, event) ⇒
        event match {
          case PostAdded(postId, content) ⇒
            state.withContent(content)
        }
      }
      //#event-handler

      //#behavior
      PersistentBehaviors.receive[BlogCommand, BlogEvent, BlogState](persistenceId = "Blog",
                                                                     emptyState = BlogState.empty,
                                                                     commandHandler,
                                                                     eventHandler)
    //#behavior
    }

}

Я пытаюсь:

    val blog = system.spawn(Blog.behavior, "Typed")

    blog ! Accumulate(1)
    blog ! Accumulate(2)
    blog ! Accumulate(3)
    blog ! AddPost(PostContent("id1", "title1", "body"), typedSystem.deadLetters)
    blog ! Accumulate(4)

Команды Accumulate (1), Accumulate (2), Accumulate (3) были сохранены в initial-command-handler, но когда выполняются buffer.unstashAll (ctx, this.behavior), команды теряются, они не возвращаются в почтовый ящик

...