Потоки Akka не запускаются, когда в Source много записей - PullRequest
0 голосов
/ 10 мая 2018

Я пытаюсь написать очень простой вводный пример использования Akka Streams. Я пытаюсь в основном создать поток, который принимает диапазон целых чисел в качестве источника и отфильтровывает все целые числа, которые не являются простыми, производя поток простых целых чисел в качестве его вывода.

Класс, который создает поток, довольно прост; для этого у меня есть следующее.

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}

Когда я запускаю следующий тест, он работает нормально.

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}

Однако, когда я увеличиваю диапазон в 10 раз, меняя строку в тесте на следующую, она больше не работает.

Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);

Теперь при запуске теста не выдается никаких исключений, нет предупреждений. Он просто запускается, затем выходит, вообще не отображая текст на консоли.

Просто чтобы быть уверенным, что проблема не в моем тесте на простоту, я провел тест в том же диапазоне без использования Akka Streams, и он работает нормально. Следующий код работает без проблем.

@Test
public void testPlain() {
    List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
    List<Integer> out = PrimeKernel.filterPrimes(in);
    System.out.println(out);
}

Просто для ясности, сам тест на простоту берет список целых чисел и устанавливает любой элемент в списке на 0, если он не прост.

Как подсказывает @RamonJRomeroyVigil, если я удаляю часть mapConcat все вместе, но оставляю все то же самое, что фактически выдает 10 000 целых чисел. Однако, если я оставляю все то же самое, но просто заменяю filterPrimes методом, который просто возвращает параметр метода как есть, не касаясь его, тогда он вообще ничего не выводит на экран. Я также попытался добавить println в начало filterPrime, чтобы отладить его. Всякий раз, когда он не печатает какие-либо выходные данные, которые включают оператор отладки. Так что даже не делается попытка вызвать filterPrimes.

1 Ответ

0 голосов
/ 10 мая 2018

runForeach возвращает CompletionStage, поэтому, если вы хотите, чтобы все числа печатались, вам нужно дождаться CompletionStage, в противном случае функция тестирования возвращается и программа завершается без завершения CompletionStage.

Пример:

flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();
...