У меня есть следующее определение очереди источника.
lazy val (processMessageSource, processMessageQueueFuture) =
peekMatValue(
Source
.queue[(ProcessMessageInputData, Promise[ProcessMessageOutputData])](5, OverflowStrategy.dropNew))
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
Класс входных данных сообщения процесса по сути является артефактом, который создается, когда вызывающий абонент вызывает конечную точку веб-сервера, которая подключается к этому потоку (т. Е. Бизнес-логика конечной точки службы помещает сообщения в эту очередь),Вывод сообщения «Обещание процесса» - это то, что завершается вниз по потоку в приемнике приложения, и веб-сервер затем получает полный ответный вызов в этом будущем, чтобы вернуть ответ обратно.
Есть и другие источники входа в этот поток.
Теперь буфер может быть зарезервирован, так как другой источник может перегружать систему, вызывая тем самым обратное давление потока.Существующий код просто удаляет новое сообщение.Но я все еще хочу завершить процесс вывода сообщения об обещании завершить с исключением, указывающим что-то вроде "Throttled".
Существует ли механизм для написания пользовательской стратегии переполнения или постобработки на переполненном элементе, который позволяет мне это делать?