USECASE
HazelcastJet версии 0.6.1 Hazelcast версии 3.10.2
С учетом этой (упрощенной версии) DAG
VERTICES
S1 Источник, который испускает 5 элементов типа A (считывается из БД с разбиением на разделы). Локальный параллелизм = 1
S2 Источник, который испускает 150K элементов типа B (итератор, который читает изБД в количестве 100 с разделением) Локальный параллелизм = 1
AD Процессор, который адаптирует типы A-> A1 и B-> B1 и излучает один за другим
FA Processors.filterP, который принимает только элементы типа A1 и испускает по одному
FB Processors.filterP, который принимает только элементы типа B1 и выдает один за другим
CL Процессор, который сначала накапливает все элементы типа А1, затем, когда получает элемент типа В1, обогащает его некоторыми сотрудниками, полученными из соответствующего А1, и излучает их один за другим.
WR Мойка с записью B1 Локальный параллелизм = 1
ПРИМЕЧАНИЕ. Просто чтобы придать смысл процессору фильтра: в группе обеспечения доступности баз данных есть другие источники, которые поступают в тот же адаптер AD и затем идут по другим путям, используя процессоры фильтра.
EDGES
S1-> AD
S2 -> AD
AD -> FA (от порядкового номера 0)
AD -> FB (от порядкового номера 1)
FA -> CL (к порядковому номеру 0 с приоритетом 0, распределенным и передаваемым)
FB -> CL (к порядковому номеру 1 с приоритетом 1)
CL -> WR
ПРОБЛЕМА
Если у источника S2 есть «несколько» элементов для загрузки (например, 15 КБ), emitFromTraverser никогда не возвращает false.
Если у источника S2 есть «много» элементов для загрузки (т.е.150K) emitFromTraverser возвращает false после:
- Все элементы A1 были обработаны CL
- Около 30% элементов B1 уже были переданы в CL, но никто не был обработанCL (DiagnosticProcessor регистрирует, что элемент отправляется в CL, но не обрабатывается)
Код S2 для справки:
protected void init(Context context) throws Exception {
super.init(context);
this.iterator = new BQueryIterator(querySupplier, batchSize);
this.traverser = Traversers.traverseIterator(this.iterator);
}
public boolean complete() {
boolean result = emitFromTraverser(this.traverser);
return result;
}
ВОПРОС
- Правильно ли, что CL не обрабатывает элементы до конца источника?
- Правильно ли использование приоритета + распределенный + широковещательный в CL Vertex?
UPDATE
Кажется, что completeEdge на CL edge 1 никогда не вызывается.Кто-то может сказать мне, почему?
Спасибо!