Неожиданный рост интервала между сплетнями и увеличением размера сообщений в распределенных данных Akka - PullRequest
2 голосов
/ 09 апреля 2019

Я использую распределенные данные Akka для репликации данных заказа между тремя узлами.В настоящее время я использую только типы CRetT GSet и LWWRegister.Но вскоре после того, как сообщение отправлено репликатору, интервал сплетен увеличивается, и репликатор регистрирует ошибку сериализации, отмечая, что максимально допустимый размер сообщения был превышен.Та же самая ошибка может быть воспроизведена даже с меньшим значением сообщения, таким как символ.Примерно через 10 минут работы кучи Java не хватает памяти.

Я следовал документации Akka, опубликованной здесь , чтобы внедрить систему.

Я искал решение для этого в течение нескольких дней и уже пытался изменить application.conf, чтобы разрешить больший размер фрейма, увеличить интервал сплетен ddata и изолировать часть приложения для репликации данных и запустить его.Я попробовал следующие методы, и они не дали мне правильного решения. 1 , 2 , 3

Вот часть моего application.conf, связанная с удаленным Akka и данными Akka

remote.artery {
    enabled = on
    transport = tcp
    canonical.port = 5053
    canonical.hostname = 127.0.0.1
    advanced {
            maximum-frame-size = 256KiB
            buffer-pool-size = 128
            maximum-large-frame-size = 4MiB
            large-buffer-pool-size = 32
    }
.....

akka.cluster.distributed-data {
  name = ddataReplicator
  role = "AthenaLB_1"
  gossip-interval = 2 s
  notify-subscribers-interval = 500 ms
  max-delta-elements = 1000
  use-dispatcher = ""
  pruning-interval = 120 s
  max-pruning-dissemination = 300 s
  pruning-marker-time-to-live = 6 h
  serializer-cache-time-to-live = 60 s

  # Settings for delta-CRDT
  delta-crdt {
    enabled = on
    max-delta-size = 200
  }

  durable {
    keys = []
    pruning-marker-time-to-live = 10 d
    store-actor-class = akka.cluster.ddata.LmdbDurableStore
    use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
    pinned-store {
      executor = thread-pool-executor
      type = PinnedDispatcher
    }

    lmdb { 
      dir = "ddata"
      map-size = 100 MiB
      write-behind-interval = off
    }
  }

Вот методы, которые я использовал для репликации состояния между узлами.Обратите внимание, что я опустил ненужные части кода.

private final ActorRef replicator = DistributedData.get(getContext().getSystem()).replicator();
private final SelfUniqueAddress node = DistributedData.get(getContext().getSystem()).selfUniqueAddress();
private static final Replicator.ReadConsistency readMajority = new Replicator.ReadMajority(Duration.ofSeconds(30));
private final Replicator.WriteConsistency writeMajority = new Replicator.WriteMajority(Duration.ofSeconds(30));


private void replicateState(ExchangeSupervisorProtos.SavedExchangeList recovery) {
    LOGGER.info("Sending replication message: {}", recovery.toString());
    Replicator.Update<LWWRegister<ExchangeSupervisorProtos.SavedExchangeList>> update = new Replicator.Update<>(
                        exchangeSupervisorRecoveryKey,
                        LWWRegister.create(node, recovery),
                        writeMajority,
                        curr -> updateExchangeSupervisorRecovery(curr, recovery));
        replicator.tell(update, self());
}

private LWWRegister<ExchangeSupervisorProtos.SavedExchangeList>   updateExchangeSupervisorRecovery(LWWRegister<ExchangeSupervisorProtos.SavedExchangeList> data,
ExchangeSupervisorProtos.SavedExchangeList recovery) {

return data.withValue(DistributedData.get(node, recovery,LWWRegister.defaultClock());
}

private void replicateState(ExchangeRecovery recovery) {
        LOGGER.info("Sending replication message: {}", recovery.toString());
        Replicator.Update<GSet<ExchangeRecovery>> update =
                new Replicator.Update<>(
                        exchangeRecoveryKey,
                        GSet.create(),
                        writeMajority,
                        curr -> updateExchangeRecovery(curr, recovery));
        replicator.tell(update, self());
    }

private GSet<ExchangeRecovery>
    updateExchangeRecovery(GSet<ExchangeRecovery> data,
                           ExchangeRecovery recovery) {
    return data.add(recovery);
}

Вот часть журналов узла.

[ERROR] [04/09/2019 11:54:48.330] [AlgoEngine-akka.remote.default-remote-dispatcher-5] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:54:56.824] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:02.724] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:07.974] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:14.427] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:18.940] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:24.855] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:30.995] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:36.779] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:44.069] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[WARN] [04/09/2019 11:55:49.770] [AlgoEngine-akka.actor.default-dispatcher-2] [akka.cluster.Cluster(akka://AlgoEngine)] Cluster Node [akka://AlgoEngine@127.0.0.1:5053] - Scheduled sending of heartbeat was delayed. Previous heartbeat was sent [2006] ms ago, expected interval is [1000] ms. This may cause failure detection to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the default dispatcher, CPU overload, or GC.
[WARN] [04/09/2019 11:55:49.772] [AlgoEngine-akka.actor.default-dispatcher-2] [akka.remote.PhiAccrualFailureDetector@61981fa5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5051: 2007 millis
[WARN] [04/09/2019 11:55:49.776] [AlgoEngine-akka.actor.default-dispatcher-8] [akka.remote.PhiAccrualFailureDetector@4c4e6ca5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5052: 2008 millis
[ERROR] [04/09/2019 11:55:51.887] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:55:58.012] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:04.127] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:10.890] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:16.821] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:23.487] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:29.192] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:35.278] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:41.683] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[WARN] [04/09/2019 11:56:52.906] [AlgoEngine-akka.actor.default-dispatcher-31] [akka.cluster.Cluster(akka://AlgoEngine)] Cluster Node [akka://AlgoEngine@127.0.0.1:5053] - Scheduled sending of heartbeat was delayed. Previous heartbeat was sent [3143] ms ago, expected interval is [1000] ms. This may cause failure detection to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the default dispatcher, CPU overload, or GC.
[WARN] [04/09/2019 11:56:52.911] [AlgoEngine-akka.actor.default-dispatcher-8] [akka.remote.PhiAccrualFailureDetector@61981fa5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5051: 3148 millis
[WARN] [04/09/2019 11:56:52.917] [AlgoEngine-akka.actor.default-dispatcher-11] [akka.remote.PhiAccrualFailureDetector@4c4e6ca5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5052: 3151 millis
[ERROR] [04/09/2019 11:56:56.014] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:56:57.216] [AlgoEngine-akka.remote.default-remote-dispatcher-5] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5052/system/ddataReplicator#-1928484884]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:57:02.880] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[ERROR] [04/09/2019 11:57:10.264] [AlgoEngine-akka.remote.default-remote-dispatcher-4] [Encoder(akka://AlgoEngine)] Failed to serialize oversized message [akka.cluster.ddata.Replicator$Internal$Gossip].
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Some(Actor[akka://AlgoEngine@127.0.0.1:5051/system/ddataReplicator#-1087129379]): max allowed size 262144 bytes. Message type [akka.cluster.ddata.Replicator$Internal$Gossip].

[WARN] [04/09/2019 11:58:51.367] [AlgoEngine-akka.actor.default-dispatcher-28] [akka.remote.PhiAccrualFailureDetector@61981fa5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5051: 4549 millis
[ERROR] [SECURITY][04/09/2019 11:58:51.370] [AlgoEngine-akka.actor.default-dispatcher-24] [akka.actor.ActorSystemImpl(AlgoEngine)] Uncaught error from thread [AlgoEngine-akka.actor.default-dispatcher-24]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[AlgoEngine]
java.lang.OutOfMemoryError: Java heap space
    at akka.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:66)
    at akka.cluster.ddata.protobuf.ReplicatedDataSerializer.toBinary(ReplicatedDataSerializer.scala:376)
    at akka.cluster.ddata.protobuf.SerializationSupport.buildOther$1(SerializationSupport.scala:144)
    at akka.cluster.ddata.protobuf.SerializationSupport.otherMessageToProto(SerializationSupport.scala:160)
    at akka.cluster.ddata.protobuf.SerializationSupport.otherMessageToProto$(SerializationSupport.scala:138)
    at akka.cluster.ddata.protobuf.ReplicatorMessageSerializer.otherMessageToProto(ReplicatorMessageSerializer.scala:151)
    at akka.cluster.ddata.protobuf.ReplicatorMessageSerializer.dataEnvelopeToProto(ReplicatorMessageSerializer.scala:485)
    at akka.cluster.ddata.protobuf.ReplicatorMessageSerializer.toBinary(ReplicatorMessageSerializer.scala:233)
    at akka.cluster.ddata.Replicator.digest(Replicator.scala:1789)
    at akka.cluster.ddata.Replicator.getDigest(Replicator.scala:1778)
    at akka.cluster.ddata.Replicator.$anonfun$gossipTo$1(Replicator.scala:1924)
    at akka.cluster.ddata.Replicator$$Lambda$918/1723713864.apply(Unknown Source)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
    at scala.collection.TraversableLike$$Lambda$7/512549200.apply(Unknown Source)
    at scala.collection.immutable.Map$Map2.foreach(Map.scala:159)
    at scala.collection.TraversableLike.map(TraversableLike.scala:237)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at akka.cluster.ddata.Replicator.gossipTo(Replicator.scala:1924)
    at akka.cluster.ddata.Replicator.$anonfun$receiveGossipTick$1(Replicator.scala:1916)
    at akka.cluster.ddata.Replicator.$anonfun$receiveGossipTick$1$adapted(Replicator.scala:1916)
    at akka.cluster.ddata.Replicator$$Lambda$914/1652707776.apply(Unknown Source)
    at scala.Option.foreach(Option.scala:274)
    at akka.cluster.ddata.Replicator.receiveGossipTick(Replicator.scala:1916)
    at akka.cluster.ddata.Replicator$$anonfun$4.applyOrElse(Replicator.scala:1491)
    at akka.actor.Actor.aroundReceive(Actor.scala:539)
    at akka.actor.Actor.aroundReceive$(Actor.scala:537)
    at akka.cluster.ddata.Replicator.aroundReceive(Replicator.scala:1349)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
    at akka.actor.ActorCell.invoke(ActorCell.scala:579)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)

[WARN] [04/09/2019 11:58:51.377] [AlgoEngine-akka.actor.default-dispatcher-11] [akka.remote.PhiAccrualFailureDetector@4c4e6ca5] heartbeat interval is growing too large for address akka://AlgoEngine@127.0.0.1:5052: 4553 millis

Возможно, мой подход неверен, так как я толькоработаю с Akka ddata уже две недели.Если кто-то знает, почему это происходит, возможная причина или возможное решение, пожалуйста, помогите мне.Спасибо.

...