Дозирование / Обрамление в Akka Stream - PullRequest
0 голосов
/ 21 октября 2019

У меня есть Source[Animal], где Animal имеет 2 типа Cat и Dog. source это что-то вроде dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ... Я пытаюсь преобразовать его в следующее Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ... Как это работает:

  • не более 3 собак в партии, не более 1 кошки в партии(в качестве альтернативы можно также решить следующее: не более 4 животных на одну партию, не более 1 кошки на партию)
  • кошка должна быть только последним (иначе говоря, обрамляющим) элементом в партии
  • также, я не могу показать скорость в примере, но должен быть тайм-аут, после которого партия (даже если не полная и без кошки) все еще испускается. Что-то вроде groupedWithin(4, FiniteDuration(3, SECONDS))
  • общий порядок важен и его необходимо поддерживать

Я пробовал что-то с batchWeighted и groupedWithin, но у меня нет правильногорешения пока нет.

Одна из идей, которую я попробовал, состояла в том, чтобы взвесить Dog как 1 и Cat как 1000 и использовать batchWeighted с max weight = 1003, но это не гарантирует, что Cat всегда является последним элементом пакета ... Попытка сделать то же самое с max weight = 3 всегда ставит Cat в отдельные группы.

Если был гибрид batchWithin и takeWhile (без завершения)тогда он, возможно, решил этот вариант использования.

Это довольно прямолинейная вещь, которую нужно решить, если она просто повторяется по List, но из-за ограничения на использование FlowOps делает это немного сложным

Редактировать: В настоящее время я делаю следующее:

  .groupedWithin(4, FiniteDuration(4, SECONDS))
  .map(frameBatch(_, Vector(), 0))
  // groupedWithin internally returns a Vector so is fast for indexed operations

  @tailrec
  private def frameBatch(
      items: Seq[Animal],
      result: Vector[Seq[Animal]],
      offset: Int
    ): Vector[Seq[Animal]] = {
    val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
    if (index == -1) {
      if (offset == 0) {
        Vector(items)
      } else {
        result :+ items.slice(offset, items.size)
      }
    } else {
      frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
    }
  }

1 Ответ

1 голос
/ 21 октября 2019

Это можно сделать только примитивами Akka Stream (с небольшим обходом актеров Akka):

object BatchFrame {
  def batchFrame[M](
    source: Source[Animal, M],
    batchSize: Int,
    interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {

    require(batchSize > 0)

    import system.dispatcher

    implicit val materializer = ActorMaterializer()

    val dataSource = source.map(x => Some(x))
    val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()

    val merged = dataSource.merge(timerSource, eagerComplete = true)

    var nextTick: Option[Cancellable] = None

    def scheduleTick(): Unit = {
      nextTick = nextTick.flatMap { c => c.cancel(); None }
      nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
    }

    scheduleTick()

    merged.statefulMapConcat{ () =>
      var dogCount = 0
      var frame: List[Animal] = Nil

      def emit(): List[Seq[Animal]] = {
        scheduleTick()
        val ret = List(frame.reverse)
        dogCount = 0
        frame = Nil
        ret
      }

      def emitWith(a: Animal): List[Seq[Animal]] = {
        frame = a :: frame
        emit()
      }

      in: Option[Animal] => {
        in match {
          case Some(cat: Cat) =>
            emitWith(cat)
          case Some(dog: Dog) if dogCount < (batchSize - 1) =>
            dogCount += 1
            frame = dog :: frame
            Nil
          case Some(dog: Dog) =>
            emitWith(dog)
          case _ =>
            emit()
        }
      }
    }
  }
}

Основной трюк (который мне пришлось искать и экспериментировать, чтобы доказать этоя сам) должен предварительно определить сроки Source, чтобы у вас было ActorRef для планирования тиков.

...