Наблюдаемая группа Monix по большому количеству ключей без утечек памяти - PullRequest
0 голосов
/ 06 января 2019

Я пытаюсь выполнить разбиение одного Observable в Monix по ключу, затем группировать до последних n событий в каждом GrouppedObservable и отправлять их для дальнейшей обработки. Проблема в том, что количество ключей для группировки, возможно, бесконечно, и это приводит к утечкам памяти.

Контекст приложения :

У меня есть поток kafka с сообщениями из многих разговоров. Каждый диалог имеет roomId, и я хочу сгруппировать этот идентификатор, чтобы получить коллекцию Observables, каждый из которых содержит только сообщения из одного разговора. Комнаты для разговоров, как правило, недолговечны, т. Е. Создается новый разговор с уникальным roomId, за короткий промежуток времени происходит обмен несколькими десятками сообщений, затем разговор закрывается. Чтобы избежать утечек памяти, я хочу сохранить буферы только 100-1000 самых последних разговоров и удалить старые. Таким образом, если событие происходит из долгого невидимого разговора, оно будет рассматриваться как новый разговор, потому что буфер с его предыдущими сообщениями будет забыт.

Метод groupBy в Monix имеет аргумент keysBuffer, который определяет, как обращаться с ключевыми буферами.

Я думал, что указание стратегии от keyBuffer до DropOld позволит мне добиться желаемого поведения.

Ниже приведена упрощенная версия описанного варианта использования.

import monix.execution.Scheduler.Implicits.global
import monix.reactive._

import scala.concurrent.duration._
import scala.util.Random

case class Event(key: Key, value: String, seqNr: Int) {
  override def toString: String = s"(k:$key;s:$seqNr)"
}

case class Key(conversationId: Int, messageNr: Int)

object Main {
  def main(args: Array[String]): Unit = {

    val fakeConsumer = Consumer.foreach(println)
    val kafkaSimulator = Observable.interval(1.millisecond)
      .map(n => generateHeavyEvent(n.toInt))

    val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
      .mergeMap(slidingWindow)

    groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
  }

  def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
    source.scan(List.empty[T])(fixedSizeList)

  def fixedSizeList[T](list: List[T], elem: T): List[T] =
    (list :+ elem).takeRight(5)

  def generateHeavyEvent(n: Int): Event = {
    val conversationId: Int = n / 500
    val messageNr: Int = n % 5
    val key = Key(conversationId, messageNr)
    val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
    Event(key, value, n)
  }

}

Однако наблюдение за кучей приложений в VisualVM указывает на утечку памяти. Примерно через 30 минут бега я получил java.lang.OutOfMemoryError: GC overhead limit exceeded

Ниже приведен скриншот графиков использования кучи с изображением запуска моего приложения в течение 30 минут. (Уплощенная часть в конце после OutOfMemoryError)

VisualVM Куча, область применения

Мой вопрос: Как я могу сгруппировать события в monix, возможно, бесконечным количеством ключей без утечки памяти? Старые ключи могут быть сброшены

Справочная информация:

  • моникс версия: 3.0.0-RC2
  • версия Scala: 2.12.8
...