Файлы нулевого потока в методе onTrigger () AbstractProcessor из Apache Nifi - PullRequest
0 голосов
/ 02 мая 2019

Я занимаюсь разработкой собственного процессора для Apache NiFi.Я создал nar моего процессора и поместил его в папку lib nifi и запустил nifi.Я настроил удаленный отладчик в eclipse и включил точку останова в первой строке onTrigger().Во время отладки я запускаю один процессор за раз в моем конвейере nifi.Я могу найти один файл потока в очереди ввода моего пользовательского процессора, однако мой пользовательский процессор не получает никакого файла потока.Когда я запускаю свой пользовательский процессор, он достигает точки останова в методе onTrigger().Внутри этого метода, когда я делаю:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles оказывается нулевого размера !!! Я не могу угадать, в каком направлении я должен проверить, чтобы выяснить, почему это происходит.Любой намек, как я могу диагностировать это?

Редактировать

Stacktrace

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1: Этот метод возвращает сразу из тела if, что дает мне следующее исключение:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified

Это исключение повторяется всегда, так как файл потока во входной очереди моегоКастомный процессор.

PS2: Я получаю следующую ошибку в файле apps.log, хотя я не уверен, является ли это источником проблемы:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

1 Ответ

1 голос
/ 02 мая 2019

Это нормальное поведение иногда получать файлы с нулевым потоком, поэтому процессоры проверяют, что у вас есть в начале.

FlowFileHandlingException означает, что файл потока был получен из сеанса, либо из get, либо create, и этот файл потока никуда не был перенесен и не был удален, поэтому в основном он не учитывается.Этого не может быть, если просто вернуться в начало в операторе if, поэтому остальная часть кода процессора выполняется и выдает эту ошибку.Вы не предоставили остальную часть кода, поэтому мы не можем видеть проблему.

Вторая проблема довольно очевидна.У вас есть группа процессов под управлением версиями, но клиент реестра, который использовался для запуска контроля версий, больше не существует.Я не знаю, как вы создали этот сценарий, потому что я считаю, что пользовательский интерфейс / API не позволит вам удалить клиента реестра, у которого активные потоки находятся под контролем версий, но вы должны иметь возможность остановить контроль версий в группе процессов.

...