Я пытаюсь написать очень простой вводный пример использования Akka Streams. Я пытаюсь в основном создать поток, который принимает диапазон целых чисел в качестве источника и отфильтровывает все целые числа, которые не являются простыми, производя поток простых целых чисел в качестве его вывода.
Класс, который создает поток, довольно прост; для этого у меня есть следующее.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class PrimeStream {
private final AverageRepository averageRepository = new AverageRepository();
private final ActorSystem actorSystem;
public PrimeStream(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
public Flow<Integer, Integer, NotUsed> filterPrimes() {
return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
}
}
Когда я запускаю следующий тест, он работает нормально.
private final ActorSystem actorSystem = ActorSystem.create("Sys");
@Test
public void testStreams() {
Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}
Однако, когда я увеличиваю диапазон в 10 раз, меняя строку в тесте на следующую, она больше не работает.
Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);
Теперь при запуске теста не выдается никаких исключений, нет предупреждений. Он просто запускается, затем выходит, вообще не отображая текст на консоли.
Просто чтобы быть уверенным, что проблема не в моем тесте на простоту, я провел тест в том же диапазоне без использования Akka Streams, и он работает нормально. Следующий код работает без проблем.
@Test
public void testPlain() {
List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
List<Integer> out = PrimeKernel.filterPrimes(in);
System.out.println(out);
}
Просто для ясности, сам тест на простоту берет список целых чисел и устанавливает любой элемент в списке на 0, если он не прост.
Как подсказывает @RamonJRomeroyVigil, если я удаляю часть mapConcat все вместе, но оставляю все то же самое, что фактически выдает 10 000 целых чисел. Однако, если я оставляю все то же самое, но просто заменяю filterPrimes методом, который просто возвращает параметр метода как есть, не касаясь его, тогда он вообще ничего не выводит на экран. Я также попытался добавить println в начало filterPrime, чтобы отладить его. Всякий раз, когда он не печатает какие-либо выходные данные, которые включают оператор отладки. Так что даже не делается попытка вызвать filterPrimes.