В Akka, как я могу направить ответы от нижестоящего актера к правильному восходящему? - PullRequest
0 голосов
/ 20 мая 2018

Вопрос новичка: я пытаюсь создать один менеджер кэша, который будет расположен между несколькими пользователями кэша (= upstream) и клиентом Redis (downstream), поэтому:

Client A  -----> |                
                 | Cache Manager <=====> Redis Connection --(tcp)--
Client B  -----> |

Идея состоит в том, чтобыповторно использовать одно соединение с Redis.Я могу отправлять команды SET асинхронно, когда ответы возвращаются от клиента Redis-клиента, как я узнаю, на какой клиент нужно передать ответ?Вот мой метод получения до сих пор:

def receive: PartialFunction[Any, Unit] = {

  case Store(key: ByteString, payload: ByteString, metadata: ByeString) => {
    // WIP: yes, I could batch these two here
    brandoClient ! Request(REDIS_SET, metadata_key(key), metadata)
    brandoClient ! Request(REDIS_SET, key, payload)
  }

  case Some(Ok) => {
    ???
  }
  ...
}

Я мог бы сделать:

case Store(key: ByteString, payload: ByteString) => {
  val future = brandoClient ? Request(REDIS_SET, key, payload)
  sender() ! Await.result(future, request_timeout.duration)
}

Но это сделает блок управления кешем.

Другой способ, о котором я могу подумать, - это создать несколько акторов Cache Manager, которые ссылаются на один и тот же клиент Redis ActorRef, чтобы я мог таким образом дедуплицировать ответы.Вот так:

Client A  -----> Cache Manager A -----> |               
                                        | Redis Connection --(tcp)--
Client B  -----> Cache Manager B -----> |

Это единственный способ сделать это?

Спасибо,

Ответы [ 2 ]

0 голосов
/ 21 мая 2018

Вместо блокировки вы можете передать результат Future отправителю.В следующем примере предполагается, что вы используете клиент Redis Brando :

import akka.actor.Actor
import akka.pattern.{ ask, pipe }
import akka.util.{ ByteString, Timeout }
import brando.{ Request, StatusReply }
import scala.concurrent.duration._

case class Store(key: ByteString, payload: ByteString)

class CacheManager extends Actor {
  import context.dispatcher
  implicit val timeout = Timeout(5 seconds)

  val brandoClient: ActorRef = ???

  def receive = {
    case Store(key, payload) =>
      (brandoClient ? Request("SET", key, payload))
        .mapTo[Some[StatusReply]]
        .pipeTo(sender())

    // case ...
  }
}
0 голосов
/ 20 мая 2018

Если вы можете изменить свою подпись сообщения, вы всегда можете добавить что-то похожее на http-идентификатор корреляции внутри каждого сообщения.

Каждый входящий запрос будет сопровождаться уникальный идентификатор , тогда вы можете сохранить внутреннюю карту типа Map[UUID, ActorRef].Если у вас есть данные для возврата, просто найдите соответствующий уникальный идентификатор на карте и отправьте данные обратно в указанный ActorRef.

...