Сериализация сообщения с protobuf для актера akka, который содержит сериализуемые данные - PullRequest
0 голосов
/ 12 февраля 2020

У меня есть постоянный субъект, который может получить один тип команды Persist(event), где событие - это тип trait Event (существует множество его реализаций). И в случае успеха он отправляет отправителю Persisted(event).

Само событие сериализуемо, поскольку именно эти данные мы храним в постоянном хранилище, а сериализация реализуется с помощью специального сериализатора, который внутренне использует классы, сгенерированные из google protobuf .proto файлы. И этот пользовательский сериализатор настроен на application.conf и привязан к базовой черте Event. Это уже работает нормально.

Примечание: Реализации Event являются , а не классами, сгенерированными protobuf. Это обычные scala классы, и у них тоже есть их протобуф-эквивалент, но они отображаются через специальный сериализатор, связанный с базовым типом события. Это было сделано моими предшественниками для управления версиями (что, вероятно, не требуется, потому что это может быть обработано с помощью простых классов protobuf + настраиваемой комбинации сериализации, но это совсем другое дело), ​​и я не буду изменять этот атм.

Сейчас мы пытаемся реализовать разбиение кластера для этого субъекта, что также означает, что мои команды (а именно Persist и Persisted) также должны быть сериализуемыми, поскольку они могут быть перенаправлены на другие узлы.

Это модель домена:

sealed trait PersistenceCommand {
  def event: Event
}

final case class Persisted(event: Event) extends PersistenceCommand
final case class Persist(event: Event) extends PersistenceCommand

Проблема в том, что я не вижу подходящего способа сделать его сериализуемым. Ниже приведены варианты, которые я рассмотрел

Подход 1. Определите новый файл прото для Persist и Persisted, но что я должен использовать в качестве типа данных для event? Я не нашел способа определить что-то вроде этого:

  message Persist {
   "com.example.Event" event = 1 // this doesn't work
   }

, чтобы я мог использовать существующую Scala черту Event в качестве типа данных. Если это работает, я думаю (хотя это далеко не так), я мог бы привязать сгенерированный код (после компиляции этого файла протока) к встроенному сериализатору akka для google protobuf, и он может работать. В приведенном выше примечании объясняется, почему я не могу использовать конструкцию oneof в своем файле прото.

Подход 2. Это то, что я реализовал, и это работает (но мне это не нравится )

По сути, я написал новый сериализатор для команд и делегировал seraizalition и десериализацию части команды event существующему сериализатору.

class PersistenceCommandSerializer extends SerializerWithStringManifest {
  val eventSerializer: ManifestAwareEventSerializer = new ManifestAwareEventSerializer()

  val PersistManifest   = Persist.getClass.getName
  val PersistedManifest = Persisted.getClass.getName
  val Separator         = "~"

  override def identifier: Int = 808653986

  override def manifest(o: AnyRef): String = o match {
    case Persist(event)   => s"$PersistManifest$Separator${eventSerializer.manifest(event)}"
    case Persisted(event) => s"$PersistedManifest$Separator${eventSerializer.manifest(event)}"
  }

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case command: PersistenceCommand => eventSerializer.toBinary(command.event)
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    val (commandManifest, dataManifest) = splitIntoCommandAndDataManifests(manifest)
    val event                           = eventSerializer.fromBinary(bytes, dataManifest).asInstanceOf[Event]
    commandManifest match {
      case PersistManifest =>
        Persist(event)
      case PersistedManifest =>
        Persisted(event)
    }
  }

  private def splitIntoCommandAndDataManifests(manifest: String) = {
    val commandAndDataManifests = manifest.split(Separator)
    (commandAndDataManifests(0), commandAndDataManifests(1))
  }
}

Проблема с этим подходом это то, что я делаю в def manifest и def fromBinary. Я должен был убедиться, что у меня есть манифест команды, а также манифест события при сериализации и десериализации. Следовательно, мне пришлось использовать ~ в качестве разделителя - своего рода, мой собственный метод сериализации для информации о манифесте.

Есть ли лучший или, возможно, правильный способ реализовать это?

Для контекста: я использую ScalaPB для генерации scala классов из .proto файлов и classi c актеров akka.

Любое руководство очень ценится!

1 Ответ

1 голос
/ 18 февраля 2020

Если вы делегируете сериализацию вложенного объекта какому-либо сконфигурированному сериализатору, у вложенного поля должно быть bytes для сериализованных данных, но также int32 с идентификатором используемого сериализатора и bytes для манифеста сообщения. , Это гарантирует, что вы сможете создавать версии / заменять вложенные сериализаторы, что важно для данных, которые будут храниться в течение более длительного периода времени.

Вы можете увидеть, как это делается внутри в Akka для наших собственных форматов проводов. здесь: https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/protobuf/WireFormats.proto#L48 и здесь https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala#L45

...