Я занимаюсь разработкой собственного процессора для Apache NiFi.Я создал nar моего процессора и поместил его в папку lib nifi и запустил nifi.Я настроил удаленный отладчик в eclipse и включил точку останова в первой строке onTrigger()
.Во время отладки я запускаю один процессор за раз в моем конвейере nifi.Я могу найти один файл потока в очереди ввода моего пользовательского процессора, однако мой пользовательский процессор не получает никакого файла потока.Когда я запускаю свой пользовательский процессор, он достигает точки останова в методе onTrigger()
.Внутри этого метода, когда я делаю:
public class MyCustomProc extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(5000);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
//...
flowFiles
оказывается нулевого размера !!! Я не могу угадать, в каком направлении я должен проверить, чтобы выяснить, почему это происходит.Любой намек, как я могу диагностировать это?
Редактировать
Stacktrace
2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
PS1: Этот метод возвращает сразу из тела if
, что дает мне следующее исключение:
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified
Это исключение повторяется всегда, так как файл потока во входной очереди моегоКастомный процессор.
PS2: Я получаю следующую ошибку в файле apps.log, хотя я не уверен, является ли это источником проблемы:
2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier