Обработка отброшенного сообщения в потоках Akka - PullRequest
1 голос
/ 09 марта 2019

У меня есть следующее определение очереди источника.

lazy val (processMessageSource, processMessageQueueFuture) =
   peekMatValue(
      Source
        .queue[(ProcessMessageInputData, Promise[ProcessMessageOutputData])](5, OverflowStrategy.dropNew))


def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M])  {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }  
    (s, p.future)
  }

Класс входных данных сообщения процесса по сути является артефактом, который создается, когда вызывающий абонент вызывает конечную точку веб-сервера, которая подключается к этому потоку (т. Е. Бизнес-логика конечной точки службы помещает сообщения в эту очередь),Вывод сообщения «Обещание процесса» - это то, что завершается вниз по потоку в приемнике приложения, и веб-сервер затем получает полный ответный вызов в этом будущем, чтобы вернуть ответ обратно.

Есть и другие источники входа в этот поток.

Теперь буфер может быть зарезервирован, так как другой источник может перегружать систему, вызывая тем самым обратное давление потока.Существующий код просто удаляет новое сообщение.Но я все еще хочу завершить процесс вывода сообщения об обещании завершить с исключением, указывающим что-то вроде "Throttled".

Существует ли механизм для написания пользовательской стратегии переполнения или постобработки на переполненном элементе, который позволяет мне это делать?

1 Ответ

0 голосов
/ 14 марта 2019

Согласно https://github.com/akka/akka/blob/master/akkastream/src/main/scala/akka/stream/impl/QueueSource.scala#L83

dropNew будет работать просто отлично.На стороне клиента это будет выглядеть так.

processMessageQueue.offer(in, pr).foreach { res =>
  res match {
    case Enqueued => // Code to handle case when successfully enqueued. 
    case Dropped => // Code to handle messages that are dropped since the buffier was overflowing. 
  }
}
...