Итак, я не уверен, что понимаю ваш вариант использования. Вы спрашиваете о противодавлении на .buffer
или на .throttle
? Другая часть моего замешательства заключается в том, что вы предлагаете испустить новый «элемент управления» в ситуации, когда поток уже подвергается давлению. Таким образом, ваш элемент управления может быть не получен в течение некоторого времени. Кроме того, если вы посылаете элемент управления каждый раз, когда получаете обратное давление, вы, вероятно, создадите поток элементов управления.
Один из способов создать это (чрезмерно наивное) решение - использовать conflate .
val simpleSink: Sink[String, Future[Done]] =
Sink.foreach(e => println(s"simple: $e"))
val cycleSource: Source[String, NotUsed] =
Source.cycle(() => List("1", "2", "3", "4").iterator).throttle(5, 1.second)
val conflateFlow: Flow[String, String, NotUsed] =
Flow[String].conflate((a, b) => {
"BACKPRESSURE CONTROL ELEMENT"
})
val backpressureFlow: Flow[String, String, NotUsed] =
Flow[String]
.buffer(10, OverflowStrategy.backpressure) throttle (2, 1.second)
val backpressureTest =
cycleSource.via(conflateFlow).via(backpressureFlow).to(simpleSink).run()
Чтобы превратить это в более удобный пример, вы можете:
Сделать какой-то вызов внутри .conflate
(а затем просто сбросить один из элементы). Будьте осторожны, чтобы ничего не блокировать. Возможно, просто отправьте сообщение, которое может быть дублировано в другом месте.
Напишите этап пользовательского графика . Делать что-то простое, как это, не было бы слишком сложно.
Я думаю, что мне придется больше понимать, что касается варианта использования. Посмотрите на всех готовых операторов, реагирующих на противодавление , и посмотрите, поможет ли один из них.