Вы не можете отобразить исходящие порты из широковещательной рассылки в указанные c подпотоки (flow1 и flow2), и аналогичным образом вам необходимо сопоставить указанные c потоки (flow1 и flow2), объединяющиеся в стадии zip, с Speci c Порт Zip стадии.
Также я думаю, что не ясно, что ожидается от потока, который вы пишете. Этап zip вернет вам кортеж (int, int), поэтому вывод zip в потоке приведет к потоку кортежей. Но ваш приемник, который должен быть добавлен после zip, не принимает поток кортежей, а поток целых чисел
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("test");
ActorMaterializer materializer = ActorMaterializer.create(system);
Source<Integer, NotUsed> source = Source.range(1, 100);
Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class).map(i -> i + 1);
Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class).map(i -> i * 2);
//create a new zip stage which accepts
//Zip<?, ?, ?> zip1 =
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip = builder.add(Zip.create());
Sink<List<Integer>, CompletionStage<Integer>> sink = Sink.fold(0, ((arg1, arg2) -> {
int value = arg1.intValue();
for (Integer i : arg2) {
value += i.intValue();
}
return value;
}));
RunnableGraph<Integer> graph = RunnableGraph.fromGraph(GraphDSL.create(flow1, flow2, sink,
(builder, flow1, flow2, sink) -> {
UniformFanOutShape fanOutShape = builder.add(Broadcast.create(2));
UniformFanInShape fanInShape = builder.add(Zip.create());
builder.from(builder.add(source))
.viaFanOut(fanOutShape)
builder
.from(broadcast.out(0))
.via(builder.add(flow1))
.toInlet(zip.in0());
builder
.from(broadcast.out(1))
.via(builder.add(flow2))
.toInlet(zip.in1());
builder
.from(zip.out()).toInlet(sink)
}
));
}
. Вы можете проверить ссылку ниже для большего количества примеров. https://github.com/Cs4r/akka-examples/blob/master/src/main/java/cs4r/labs/akka/examples/ConstructingGraphs.java