Процессор Nifi для оперативной обработки асинхронных задач - PullRequest
0 голосов
/ 09 мая 2018

У меня есть процессор Nifi, который вызывает внешнюю службу, которая может занять несколько дней, прежде чем результат будет возвращен. В течение этого времени процессор может периодически вызывать Thread.sleep () для освобождения процессора.

Проблема заключается в том, что даже если Thread.sleep () вызывается в методе onTrigger (), процессор NiFi не будет считывать и обрабатывать новые FlowFiles, поскольку ожидает завершения onTrigger (). С точки зрения NiFi, процессор все еще блокирует завершение асинхронного вызова.

Есть ли способ поддерживать параллелизм при выполнении асинхронных вызовов в методе onTrigger () процессора NiFi?

1 Ответ

0 голосов
/ 11 мая 2018

Предложение Val Bonn отправить асинхронные FlowFiles обратно в очередь WAIT работает хорошо. По мере поступления асинхронных запросов объекты Java-процесса создаются и хранятся в памяти. Затем FlowFile направляется в отношение WAIT, которое снова подключается к процессору. Периодически FlowFiles из очереди WAIT проверяются на соответствие соответствующему процессу, чтобы увидеть, завершен ли он, и затем направляются в отношение SUCCESS, в противном случае они штрафуются. Это позволяет запускать многие долго выполняющиеся асинхронные процессы без выделения ценных ресурсов ЦП для каждого входящего запроса. Одним из источников сложности была обработка отключений процессора, вызванных из пользовательского интерфейса. В этих ситуациях вызывается метод onStopped, который ожидает завершения всех процессов в памяти и архивирует stderr и stdout на диск. Когда процессор запускается снова, архив считывается и сопоставляется с любыми потоками файлов в очереди WAIT.

...