Потоки Akka: моделирование потока с ошибкой с помощью OverflowStrategy.fail () - PullRequest
2 голосов
/ 17 апреля 2020

Я новичок в потоках Акка и Акка. Я создал фиктивный поток и ожидал, что он закончится исключением, поскольку моя функция map () очень медленная, и я установил буфер на 1.

Так что мой вопрос состоит из двух частей:

  1. Почему этот код работает без сбоев?
  2. Как я могу имитировать переполнение? (для учебных целей)
    import akka.NotUsed;
    import akka.actor.ActorSystem;
    import akka.stream.OverflowStrategy;
    import akka.stream.javadsl.Sink;
    import akka.stream.javadsl.Source;

    public class Application {

        public static void main(String[] args) {
            final ActorSystem system = ActorSystem.create("reactive-test");
            Source<Integer, NotUsed> source =
                    Source.range(0, 10000000)
                    .buffer(1, OverflowStrategy.fail())
                    .map(Application::doubleInt);      
            source.runWith(Sink.foreach(a -> System.out.println(a)), system);
        }

        private static Integer doubleInt(int i) {
            try {
                Thread.sleep(2_000);
            } catch (Exception e) {
                System.out.println(e);
            }
            return 2 * i;
        }
    }

1 Ответ

4 голосов
/ 17 апреля 2020

Почему этот код работает без сбоев?

Причина - обратное давление. Источник не будет производить больше элементов, чем может потреблять ваш приемник, поэтому медленный приемник напрямую влияет на скорость создания элементов. Ваш буфер никогда не переполняется в результате.

Как я могу имитировать переполнение? (для учебных целей)

Имейте раковину, которая стремится потреблять, но в то же время медленная. Его можно эмулировать, добавив grouped(1000), который создает список из 1000 элементов и передает его вниз по течению.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class StreamsBufJava {

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create("reactive-test");
        Source<Integer, NotUsed> source =
                Source.range(0, 10000000)
                        .buffer(1, OverflowStrategy.fail())
                        .grouped(1000)
                        .mapConcat(list -> list)
                        .map(StreamsBufJava::doubleInt);


        source.runWith(Sink.foreach(System.out::println), system);
    }

    private static Integer doubleInt(int i) {
        try {
            Thread.sleep(2_000);
        } catch (Exception e) {
            System.out.println(e);
        }

        return 2 * i;
    }
}

Производит

0
2
[ERROR] [04/17/2020 09:40:47.671] [reactive-test-akka.actor.default-dispatcher-5] [Buffer(akka://reactive-test)] Failing because buffer is full and overflowStrategy is: [Fail] in stream [class akka.stream.impl.fusing.Buffer$$anon$26]
4
6
8
...