Как правильно построить Akka Graph - PullRequest
0 голосов
/ 09 февраля 2020

Я хочу создать график, который имеет источник, и этот источник связан с широковещательной рассылкой, которая разветвляется через два потока, а затем вывод архивируется в приемник.

Я сделал почти все, но у меня есть две проблемы:

  • Строитель не принимает мою форму FanIn
  • Я предоставляю раковину, но требуется раковина формы, и я не знаю, как ее получить. что

     public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("test");
        ActorMaterializer materializer = ActorMaterializer.create(system);
    
        Source<Integer, NotUsed> source = Source.range(1, 100);
        Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class).map(i -> i + 1);
        Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class).map(i -> i * 2);
        Sink<List<Integer>, CompletionStage<Integer>> sink = Sink.fold(0, ((arg1, arg2) -> {
            int value = arg1.intValue();
            for (Integer i : arg2) {
                value += i.intValue();
            }
            return value;
        }));
        RunnableGraph<Integer> graph = RunnableGraph.fromGraph(GraphDSL.create(
                (builder) -> {
                    UniformFanOutShape fanOutShape = builder.add(Broadcast.create(2));
                    UniformFanInShape fanInShape = builder.add(Zip.create());
                    return builder.from(builder.add(source))
                            .viaFanOut(fanOutShape)
                            .via(builder.add(flow1))
                            .via(builder.add(flow2))
                            .viaFanIn(fanInShape)
                            .to(sink);
                }
        ));
    }
    

любая помощь приветствуется

1 Ответ

1 голос
/ 10 февраля 2020

Вы не можете отобразить исходящие порты из широковещательной рассылки в указанные c подпотоки (flow1 и flow2), и аналогичным образом вам необходимо сопоставить указанные c потоки (flow1 и flow2), объединяющиеся в стадии zip, с Speci c Порт Zip стадии.

Также я думаю, что не ясно, что ожидается от потока, который вы пишете. Этап zip вернет вам кортеж (int, int), поэтому вывод zip в потоке приведет к потоку кортежей. Но ваш приемник, который должен быть добавлен после zip, не принимает поток кортежей, а поток целых чисел

public static void main(String[] args) {
    ActorSystem system = ActorSystem.create("test");
    ActorMaterializer materializer = ActorMaterializer.create(system);

    Source<Integer, NotUsed> source = Source.range(1, 100);
    Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class).map(i -> i + 1);
    Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class).map(i -> i * 2);
    //create a new zip stage which accepts
    //Zip<?, ?, ?> zip1 = 
        final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip = builder.add(Zip.create());
    Sink<List<Integer>, CompletionStage<Integer>> sink = Sink.fold(0, ((arg1, arg2) -> {
        int value = arg1.intValue();
        for (Integer i : arg2) {
            value += i.intValue();
        }
        return value;
    }));
    RunnableGraph<Integer> graph = RunnableGraph.fromGraph(GraphDSL.create(flow1, flow2, sink,
            (builder, flow1, flow2, sink) -> {
                UniformFanOutShape fanOutShape = builder.add(Broadcast.create(2));
                UniformFanInShape fanInShape = builder.add(Zip.create());
                builder.from(builder.add(source))
                        .viaFanOut(fanOutShape)
                builder
                   .from(broadcast.out(0))
                   .via(builder.add(flow1))
                   .toInlet(zip.in0());
                builder
                   .from(broadcast.out(1))
                   .via(builder.add(flow2))
                   .toInlet(zip.in1());
               builder
                  .from(zip.out()).toInlet(sink)

            }
    ));
}

. Вы можете проверить ссылку ниже для большего количества примеров. https://github.com/Cs4r/akka-examples/blob/master/src/main/java/cs4r/labs/akka/examples/ConstructingGraphs.java

...