Почему этот код работает без сбоев?
Причина - обратное давление. Источник не будет производить больше элементов, чем может потреблять ваш приемник, поэтому медленный приемник напрямую влияет на скорость создания элементов. Ваш буфер никогда не переполняется в результате.
Как я могу имитировать переполнение? (для учебных целей)
Имейте раковину, которая стремится потреблять, но в то же время медленная. Его можно эмулировать, добавив grouped(1000)
, который создает список из 1000 элементов и передает его вниз по течению.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class StreamsBufJava {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.grouped(1000)
.mapConcat(list -> list)
.map(StreamsBufJava::doubleInt);
source.runWith(Sink.foreach(System.out::println), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
Производит
0
2
[ERROR] [04/17/2020 09:40:47.671] [reactive-test-akka.actor.default-dispatcher-5] [Buffer(akka://reactive-test)] Failing because buffer is full and overflowStrategy is: [Fail] in stream [class akka.stream.impl.fusing.Buffer$$anon$26]
4
6
8