Можно ли вложить конвейеры Hazelcast JET так, чтобы внутренний конвейер мог вычислять результаты для внешнего конвейера? - PullRequest
0 голосов
/ 13 марта 2019

Рассмотрим следующий сценарий:

Мы хотим получить большую распределенную коллекцию объектов, и для каждого объекта в коллекции мы хотим запустить другое вычисление, которое использует текущий объект, и другую большую распределенную коллекцию для вычисления результата, который преобразует текущий объект.

* 1005 Е.Г. *

коллекция А: 1,2,3,4,5,6,7,8 ......

коллекция B: 1,2,3,4,5,6,7,8 ......

Для каждого значения в А, мы перебираем все значения в B, умножая каждое на 2 и суммируя эти значения, мы сопоставляем каждое значение в A с этой суммой, умноженной на текущее значение A.

Ниже моя попытка, которая приводит к тупику, когда используется следующее:

c2.newJob(p2).join()

нет тупика, когда используется следующее:

c2.newJob(p2)

, однако мы хотим, чтобы p2 завершил работу, чтобы убедиться, что мы получаем правильную сумму.

Это может показаться не идиоматическим способом использования Jet для этого конкретного случая использования, однако я хочу использовать этот шаблон для решения других проблем, и поэтому я был бы очень признателен за вашу помощь в этом.

JetInstance jet =  Jet.newJetInstance();
JetInstance c1 =  Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn =  jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

p1.drawFrom(Sources.list("a-in"))
        .map(e -> {
          Pipeline p2 = Pipeline.create();
          JetInstance c2 =  Jet.newJetClient();

          List<Integer> bIn = c2.getList("b-in");
          bIn.add(1);
          bIn.add(2);
          bIn.add(3);

          p2.drawFrom(Sources.list("b-in"))
                  .map(i->((Integer)i)*2)
                  .drainTo(Sinks.list("b-out"));

          List<Integer> bOut = c2.getList("b-out");

          // I would have thought it should just wait for the computation to complete,
          // instead the join here causes jet to block itself,
          c2.newJob(p2).join();

          int sum = 0;
          for (Integer i : bOut){
            sum+=i;
          }

          return ((Integer)e)*sum;
        }).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();

Ответы [ 2 ]

1 голос
/ 14 марта 2019

В вашем коде несколько проблем:

  1. функция map не должна блокироваться. В следующей версии мы добавляем mapUsingContextAsync, где вы можете использовать клиентское соединение в качестве контекста, отправить работу и вернуть job.getFuture().

  2. операции map будут выполняться параллельно. Вы должны убедиться, что они не делятся временным списком. В ваших примерах все подзадачи используют b-out и перезаписывают данные друг друга.

  3. Причина тупика заключалась в следующем: join() в map() заблокировал работника кооператива и ждал завершения подзадачи, но подзадача не может быть завершена из-за заблокированного работника кооператива нить.

Кроме того, Jet не оптимизирован для очень небольших пакетных заданий, но я думаю, что ваша реальная работа больше. Там довольно много накладных расходов для развертывания работы; если само задание выполняется только в течение нескольких мс, накладные расходы являются существенными. В этом конкретном случае вам лучше использовать list.stream().map(i->i*2).sum() вместо подзадачи.

JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);

p1.drawFrom(Sources.list("a-in"))
  .mapUsingContextAsync(
          ContextFactory
                  .withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
                  // mark as non-cooperative, job submission does some blocking
                  .toNonCooperative()
                  .withLocalSharing()
                  .withMaxPendingCallsPerProcessor(2)
                  .withDestroyFn(ctx -> ctx.f1().destroy()),
          (ctx, item) -> {
              Pipeline p2 = Pipeline.create();
              JetInstance instance = ctx.f0();
              UUID key = UUID.randomUUID();
              IMapJet<UUID, Long> tmpResultsMap = ctx.f1();

              p2.drawFrom(Sources.list("b-in"))
                .map(i -> ((Integer) i) * 2L)
                .aggregate(summingLong(Long::longValue))
                .map(sum -> entry(key, sum))
                .drainTo(Sinks.map(tmpResultsMap));

              return instance.newJob(p2).getFuture()
                             .thenApply(r -> entry(item, tmpResultsMap.remove(key)));
          })
  .drainTo(Sinks.list("a-out"));

c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);

Это печатает следующий вывод:

1=12
2=12
3=12

Приведенный выше код работает в текущем снимке и должен работать в Jet 3.0, срок которого истекает через недели.

1 голос
/ 14 марта 2019

@ newlogic, попробуйте этот подход:

  1. Создайте задание, которое читает из b-in и пишет в b-out карту, а не список. Вы можете использовать известный ключ или просто использовать временную метку и т. Д. В качестве ключа и определить TTL для этой таблицы, чтобы удалить старые результаты.
  2. Создайте прослушиватель на таблице b-out (локальный прослушиватель, так что будет уведомлен только узел, который содержит обновленный ключ) для прослушивания событий entryAdded / Updated, зависит от того, что вы выбрали на первом шаге, и отправьте новую работу из этого метод слушателя для обработки a-in.

Таким образом, вам не нужно ждать, когда первое задание будет завершено, оно автоматически вызовет второе задание.

...