Воспроизведение собранных данных в реальном времени для имитации реальных задержек трафика и упорядочения сообщений - PullRequest
1 голос
/ 03 июня 2019

Имеет два входных потока, каждый из которых создает экземпляры объектов, определенные как

case class ReplayData(timestamp:Long, payload:Any)

Поток 1

1, полезная нагрузка1

1000, полезная нагрузка3

Поток 2

400, полезная нагрузка2

1500, полезная нагрузка4

Я хотел бы реализовать механизм воспроизведения, который будет выдвигать элементы вниз по потоку, упорядоченные по временной метке, которую я имею на каждом элементе

Он будет имитировать живые сценарии с производства.

Этот механизм должен подчиняться задержкам между сообщениями , например. первое отправленное сообщение - это payload1 (его начальная точка), payload2 из Stream2 должно быть отправлено через 400 мс (разница между отметкой времени следующего сообщения и отметкой времени исходного сообщения) и т. д.

Я могу сделать это довольно легко, используя DelayedQueue , использование которого объясняется в этой SO теме

Неограниченная очередь блокировки отложенных элементов, в которой элемент может быть принято только после истечения срока его задержки.

Главой очереди является тот элемент Delayed, задержка которого истекла самый дальний в прошлом. Если нет задержки, нет головы и опрос вернет ноль.

Истечение срока действия происходит, когда элемент getDelay (TimeUnit.NANOSECONDS) Метод возвращает значение, меньшее или равное нулю. Даже если Неиспользованные элементы не могут быть удалены с помощью take или poll, они иначе рассматривается как нормальные элементы.

Например, метод size возвращает счетчик как истекших, так и неистекшие элементы. Эта очередь не разрешает нулевые элементы. делает не разрешать нулевые элементы.

Я пытаюсь понять, как это сделать в потоках Акка, но не могу найти то, что решило бы эту проблему для меня.

Я рассматривал mergeSorted как способ объединения двух потоков и упорядочения их по какой-либо функции.

И, похоже, более или менее подходит для этой цели упорядочения на основе какой-то пользовательской функции.

Я не уверен, как обрабатывать задержки между элементами на основе свойства timestamp .

Используя простой старый AKKA, я могу использовать планировщик для чтения данных, упорядочивать их и планировать каждый элемент для отправки, когда пройдет время.

1 Ответ

1 голос
/ 04 июня 2019

Я не помню ничего в akka-streams, которое могло бы задерживать сообщения из коробки с пользовательской задержкой для каждого сообщения. В конце концов, идея akka-streams - реактивное программирование. я знаю только 2 варианта, как вообще решить вашу проблему (при условии, что вы уже объединили 2 источника)

  1. Flow.mapAsync - в этом случае ваша задача вернуть Future после некоторой задержки. например:

    import java.time.LocalDateTime
    import java.util.concurrent.Executors
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.pattern.after
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.concurrent.{ExecutionContext, Future}
    
    object Application extends App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      case class SomeEntity(time: Int, value: Int)
      val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400)).map(i => SomeEntity(i, i * i + 3))
    
      val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
      val scheduler = sys.scheduler
    
      val f = source
        .mapAsync(10)(se => after(se.time.milliseconds, scheduler)(Future.successful(se))(ec))
        .runForeach(se => println(s"${LocalDateTime.now()} -> $se"))
    
      f.onComplete(_ => sys.terminate())
    }
    
  2. Может случиться, что ваш сценарий использования (в конце концов, имитация) на самом деле не такой строгий, поэтому вы можете использовать Flow.throttle . Это не так просто и точно, как 1-е решение, но гораздо более производительно, потому что для управления скоростью вывода предметов используется некоторая облегченная модель корзины.

    import java.time.LocalDateTime
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    
    object Application extends App {
    
      implicit val sys: ActorSystem = ActorSystem()
      implicit val mat: ActorMaterializer = ActorMaterializer()
    
      case class SomeEntity(time: Int, value: Int)
    
      val source: Source[SomeEntity, NotUsed] = Source(List(100, 200, 400, 1000, 1100, 1400, 1400, 1500, 1900, 2500, 2700)).map(i => SomeEntity(i, i * i + 3))
    
    
      val future = source.throttle(cost = 1, per = 1.millisecond, _.time).runForeach(se => {
        println(s"${LocalDateTime.now()} -> $se")
      })
    
      future.onComplete(_ => sys.terminate())
    }
    
...