Рассмотрим следующий сценарий:
Мы хотим получить большую распределенную коллекцию объектов, и для каждого объекта в коллекции мы хотим запустить другое вычисление, которое использует текущий объект, и другую большую распределенную коллекцию для вычисления результата, который преобразует текущий объект.
* 1005 Е.Г. *
коллекция А: 1,2,3,4,5,6,7,8 ......
коллекция B: 1,2,3,4,5,6,7,8 ......
Для каждого значения в А,
мы перебираем все значения в B, умножая каждое на 2 и суммируя эти значения,
мы сопоставляем каждое значение в A с этой суммой, умноженной на текущее значение A.
Ниже моя попытка, которая приводит к тупику, когда используется следующее:
c2.newJob(p2).join()
нет тупика, когда используется следующее:
c2.newJob(p2)
, однако мы хотим, чтобы p2 завершил работу, чтобы убедиться, что мы получаем правильную сумму.
Это может показаться не идиоматическим способом использования Jet для этого конкретного случая использования, однако я хочу использовать этот шаблон для решения других проблем, и поэтому я был бы очень признателен за вашу помощь в этом.
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.map(e -> {
Pipeline p2 = Pipeline.create();
JetInstance c2 = Jet.newJetClient();
List<Integer> bIn = c2.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p2.drawFrom(Sources.list("b-in"))
.map(i->((Integer)i)*2)
.drainTo(Sinks.list("b-out"));
List<Integer> bOut = c2.getList("b-out");
// I would have thought it should just wait for the computation to complete,
// instead the join here causes jet to block itself,
c2.newJob(p2).join();
int sum = 0;
for (Integer i : bOut){
sum+=i;
}
return ((Integer)e)*sum;
}).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();