NiFi: отправить поток файлов @OnflowFile - PullRequest
0 голосов
/ 30 апреля 2018

Можно ли отправить потоковый файл на аннотацию @OnStopped?

По сути, я хочу написать собственный процессор, который может отправить атрибут в flowFile при остановке процессора.

Есть предложения?

Я пытался ниже: -

ProcessSession session;
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
    if (flowFile == null) {
            flowFile = session.create();
        }
    flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "start");
    session.transfer(flowFile, success);
}
@OnStopped
   public void sendStop() {
    FlowFile flowFile = session.get();
    flowFile = session.create();
    flowFile = session.putAttribute(flowFile, "ATTRIBUTE_SIGNAL", "stop");
    session.transfer(flowFile, success);

   }

но происходит сбой в 2018-04-30 20: 44: 25,540 ОШИБКА [StandardProcessScheduler Thread-3] org.apache.nifi.util.ReflectionUtils Ошибка при вызове аннотированного метода public void com.kotak.nifi.processors. streaming.SignalGenerator.sendStop () 'с аргументами' [] '. java.lang.reflect.InvocationTargetException: null.

1 Ответ

0 голосов
/ 30 апреля 2018

Методы жизненного цикла, такие как OnScheduled / OnStopped / etc, на самом деле не предназначены для создания потоковых файлов, поэтому у вас нет доступа к ProcessSession, только у onTrigger.

Обычно предполагается, что процессоры слабо связаны, когда один процессор на самом деле не знает / не заботится о других процессорах, он просто извлекает файлы потока из очереди и обрабатывает их.

Технически вы можете достичь желаемого, сохранив ссылку на ProcessSession, полученную вами в onTrigger, в переменную-член процессора, чтобы вы могли использовать ее позже в OnStopped.

...