Блок захвата не вызывается при исключении Groovy в процессоре NiFi ExecuteScript - PullRequest
0 голосов
/ 04 октября 2018

Я использую NiFi ExecuteScript для вызова Groovy-скрипта, который извлекает текст из PDF-файлов.Когда не удается извлечь, предполагается, что возникнет исключение, а файл потока перенаправляется в REL_FAILURE.Некоторые PDF-файлы проходят нормально, а некоторые выдают ошибку:

ExecuteScript[id=9a39e0cb-ebcc-31e4-a169-575e367046e9] Failed to process session due to javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=2d6540f7-b7a2-48c7-8978-6b90bbfb0ff5,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1538596326047-12, container=default, section=12], offset=2134, length=930225],offset=0,name=1  i-9 INS rev 87   05-07-87.pdf,size=930225] already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: StandardFlowFileRecord[uuid=2d6540f7-b7a2-48c7-8978-6b90bbfb0ff5,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1538596326047-12, container=default, section=12], offset=2134, length=930225],offset=0,name=1  i-9 INS rev 87   05-07-87.pdf,size=930225] already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed

Мой (упрощенный) код приведен ниже:

def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, { inputStream, outputStream ->
    try {
        // Load PDF from inputStream and parses text into a JSON string

        // If nothing can be extracted, throw an exception so the flowfile
        // can be routed to REL_FAILURE and processed further down the NiFi pipeline
        if(outputLength < 15) {
            throw new Exception('No output, send to REL_FAILURE')
        }

        // Write the string to the flowFile to be transferred
        outputStream.write(json.getBytes(StandardCharsets.UTF_8))
    } catch (Exception e){
        System.out.println(e.getMessage())
        session.transfer(flowFile, REL_FAILURE)
    }
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)

Он довольно близко следует кулинарной книге , опубликованной вФорум сообщества Hortonworks s, и автор даже упоминает, что закрытие обрабатывается автоматически.

Я думаю, что ошибка вызвана тем, что PDF не удается обработать.Это вызывает исключение, которое должно быть перехвачено в try{}catch{} и затем передано в REL_FAILURE.Вместо этого кажется, что catch{} никогда не вызывается, поэтому объект outputStream никогда не закрывается.Он работает, как и ожидалось, и замечательно работает, когда я запускаю тот же код Groovy за пределами NiFi.

Если вы хотите попробовать запустить его на своем собственном сервере

Шаблон NiFi

полный код Groovy .

Образец PDF

1 Ответ

0 голосов
/ 04 октября 2018

Try / catch должен быть вне вызова session.write (), а не в обратном вызове.Внутри обратного вызова, бросьте IOException, а не Exception, которое должно распространяться вверх через session.write () и должно вводить ваше предложение catch снаружи.Затем вы можете перенести файл потока на сбой (вам не должно быть разрешено передавать файл потока во время записи в него).

...