Я пытаюсь выполнить разбиение одного Observable
в Monix по ключу, затем группировать до последних n
событий в каждом GrouppedObservable
и отправлять их для дальнейшей обработки. Проблема в том, что количество ключей для группировки, возможно, бесконечно, и это приводит к утечкам памяти.
Контекст приложения :
У меня есть поток kafka с сообщениями из многих разговоров. Каждый диалог имеет roomId
, и я хочу сгруппировать этот идентификатор, чтобы получить коллекцию Observables, каждый из которых содержит только сообщения из одного разговора.
Комнаты для разговоров, как правило, недолговечны, т. Е. Создается новый разговор с уникальным roomId
, за короткий промежуток времени происходит обмен несколькими десятками сообщений, затем разговор закрывается.
Чтобы избежать утечек памяти, я хочу сохранить буферы только 100-1000 самых последних разговоров и удалить старые. Таким образом, если событие происходит из долгого невидимого разговора, оно будет рассматриваться как новый разговор, потому что буфер с его предыдущими сообщениями будет забыт.
Метод groupBy в Monix имеет аргумент keysBuffer
, который определяет, как обращаться с ключевыми буферами.
Я думал, что указание стратегии от keyBuffer
до DropOld позволит мне добиться желаемого поведения.
Ниже приведена упрощенная версия описанного варианта использования.
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent.duration._
import scala.util.Random
case class Event(key: Key, value: String, seqNr: Int) {
override def toString: String = s"(k:$key;s:$seqNr)"
}
case class Key(conversationId: Int, messageNr: Int)
object Main {
def main(args: Array[String]): Unit = {
val fakeConsumer = Consumer.foreach(println)
val kafkaSimulator = Observable.interval(1.millisecond)
.map(n => generateHeavyEvent(n.toInt))
val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
.mergeMap(slidingWindow)
groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
}
def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
source.scan(List.empty[T])(fixedSizeList)
def fixedSizeList[T](list: List[T], elem: T): List[T] =
(list :+ elem).takeRight(5)
def generateHeavyEvent(n: Int): Event = {
val conversationId: Int = n / 500
val messageNr: Int = n % 5
val key = Key(conversationId, messageNr)
val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
Event(key, value, n)
}
}
Однако наблюдение за кучей приложений в VisualVM указывает на утечку памяти. Примерно через 30 минут бега я получил java.lang.OutOfMemoryError: GC overhead limit exceeded
Ниже приведен скриншот графиков использования кучи с изображением запуска моего приложения в течение 30 минут. (Уплощенная часть в конце после OutOfMemoryError
)
VisualVM Куча, область применения
Мой вопрос: Как я могу сгруппировать события в monix, возможно, бесконечным количеством ключей без утечки памяти? Старые ключи могут быть сброшены
Справочная информация:
- моникс версия:
3.0.0-RC2
- версия Scala:
2.12.8