Ниже код выдает OOO на экземпляре EC2 в течение 15 минут после запуска (конфигурация java xms 1024 xmx2G), но не выдает никакой ошибки при работе на intellij.
SqsSource(queueUrl,
//parallelism = maxBufferSize / maxBatchSize 20 10
SqsSourceSettings().withWaitTime(10 seconds)
.withMaxBatchSize(10).withMaxBufferSize(20)
).map {
msg => {
val out = Source.single(msg)
.via(messageToLambdaRequest)
.via(lambdaRequestToLambdaResp)
.via(lambdaRespToAggregationKeySet)
.via(workFlow)
.log("error while consuming events internally.")
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
val reducedResponse = out.map(response => {
response.foldLeft[Response](OK)((a, b) =>
if (a == OK && b == OK) OK else NotOK)
})
val messageAction = reducedResponse
.map(res =>
if (res == OK) {
//log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
delete(msg)
} else
ignore(msg)
)
messageAction
}
}
.mapAsync(1)(identity)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
// For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
// must be less than or equal to the thread pool size.
.log("error log")
.runWith(SqsAckSink(queueUrl, SqsAckSettings(1)))
}
Я пробовал это с 1.0-M3и 1.0-RC1 оба.Есть ли решение этой проблемы?
Топ-5 гистограмм создания объектов с использованием jhat -
Class Instance Count Total Size
class [C 1376284 2068640582
class software.amazon.awssdk.services.sqs.model.Message 332718 18632208
class java.lang.String 1375675 16508100
class [Lakka.dispatch.forkjoin.ForkJoinTask; 227 14880304
class scala.collection.immutable.$colon$colon 334396 5350336
Также я нашел подобную проблему здесь - https://github.com/akka/alpakka/issues/1588
Мне интересно, есть ли альтернатива для решения этой проблемы.