Источник DAG возвращает значение false в emitFromTraverser, и процессор ожидает начала загрузки всех элементов источника перед началом обработки - PullRequest
0 голосов
/ 27 сентября 2018

USECASE

HazelcastJet версии 0.6.1 Hazelcast версии 3.10.2

С учетом этой (упрощенной версии) DAG

VERTICES

S1 Источник, который испускает 5 элементов типа A (считывается из БД с разбиением на разделы). Локальный параллелизм = 1

S2 Источник, который испускает 150K элементов типа B (итератор, который читает изБД в количестве 100 с разделением) Локальный параллелизм = 1

AD Процессор, который адаптирует типы A-> A1 и B-> B1 и излучает один за другим

FA Processors.filterP, который принимает только элементы типа A1 и испускает по одному

FB Processors.filterP, который принимает только элементы типа B1 и выдает один за другим

CL Процессор, который сначала накапливает все элементы типа А1, затем, когда получает элемент типа В1, обогащает его некоторыми сотрудниками, полученными из соответствующего А1, и излучает их один за другим.

WR Мойка с записью B1 Локальный параллелизм = 1

ПРИМЕЧАНИЕ. Просто чтобы придать смысл процессору фильтра: в группе обеспечения доступности баз данных есть другие источники, которые поступают в тот же адаптер AD и затем идут по другим путям, используя процессоры фильтра.

EDGES

S1-> AD

S2 -> AD

AD -> FA (от порядкового номера 0)

AD -> FB (от порядкового номера 1)

FA -> CL (к порядковому номеру 0 с приоритетом 0, распределенным и передаваемым)

FB -> CL (к порядковому номеру 1 с приоритетом 1)

CL -> WR

ПРОБЛЕМА

Если у источника S2 есть «несколько» элементов для загрузки (например, 15 КБ), emitFromTraverser никогда не возвращает false.

Если у источника S2 есть «много» элементов для загрузки (т.е.150K) emitFromTraverser возвращает false после:

  • Все элементы A1 были обработаны CL
  • Около 30% элементов B1 уже были переданы в CL, но никто не был обработанCL (DiagnosticProcessor регистрирует, что элемент отправляется в CL, но не обрабатывается)

Код S2 для справки:

protected void init(Context context) throws Exception {
    super.init(context);
    this.iterator = new BQueryIterator(querySupplier, batchSize);
    this.traverser = Traversers.traverseIterator(this.iterator);
}

public boolean complete() {
    boolean result = emitFromTraverser(this.traverser);
    return result;
}

ВОПРОС

  • Правильно ли, что CL не обрабатывает элементы до конца источника?
  • Правильно ли использование приоритета + распределенный + широковещательный в CL Vertex?

UPDATE

Кажется, что completeEdge на CL edge 1 никогда не вызывается.Кто-то может сказать мне, почему?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Вы страдаете от тупика, вызванного приоритетом.Ваша группа обеспечения доступности баз данных ветвится из AD, а затем возвращается в CL, но с приоритетом.

AD --+---- FA ----+-- CL
      \          /
       +-- FB --+

Установка приоритета приводит к тому, что ни один элемент с краем с более низким приоритетом не обрабатывается до всех элементов с более высоким-приоритетные края обрабатываются.AD будет в конечном итоге заблокировано противодавлением от пути с более низким приоритетом, который не обрабатывается CL.Таким образом, AD заблокирован, потому что он не может излучать на край с более низким приоритетом, а CL заблокирован, потому что он все еще ожидает элементы с края с более высоким приоритетом, что приводит к тупику.

В вашем случае,Вы можете решить это, сделав 2 AD вершин, каждый обрабатывая элементы из одного из источников:

S1 --- AD1 ----+--- CL
              /
S2 --- AD2 --+
0 голосов
/ 28 сентября 2018

Через некоторое время я понял, в чем проблема ...

Процессор CL не может знать, когда все элементы A1 были обработаны, потому что все элементы поступают из процессора AD.Поэтому необходимо дождаться всех источников, поступающих из AD, прежде чем начинать обработку элементов B1.

Не уверен, но, вероятно, после загрузки большого количества элементов B все буферы входящих сообщений в DAG заполнятся и не смогут принятьлюбой другой B из S2, но в то же время не может обработать элементы B1 для продолжения: это тупик.

Может быть, DAG сможет обнаружить это?Я не знаю Джета так глубоко, но было бы неплохо получить это предупреждение.

Может быть, есть какая-то регистрация, чтобы включить?

Я надеюсь, что кто-то может подтвердить мой ответ и предложить, как улучшить и обнаружить эти проблемы.

...