В нифийском Groovy скрипте выдача исключения уже помечена для передачи - PullRequest
0 голосов
/ 06 октября 2019

У меня есть входная полезная нагрузка {"ENV": {"MAC": "6CECEB5D0302","NS": "NM","DATE": "170619","TIME": "114635","PM2.5": "10","PM10": "8.272681196722012"}}.

Я использовал приведенный ниже скрипт для создания нескольких потоковых файлов, и он работал нормально. Но тот же код не работает для передачи одного потокового файла. В приведенном ниже коде, основанном на коде if, который я хотел, скрипт должен либо передавать поток на следующий процессор, либо просто регистрировать информацию и удалять этот файл потока.

import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;
import java.lang.*;

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def flowFiles = [] as List<FlowFile>
flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        try{
            def data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
            def input = new JsonSlurper().parseText( data );
            if( Double.parseDouble(input.ENV["PM2.5"])>7.2) {
                Map out =
                    [ENVIRONMENT: [
                      DATE: input.ENV.DATE,
                      "PM2.5": input.ENV["PM2.5"],
                      TIME: input.ENV.TIME,
                      DATE_TIME: input.ENV.DATE,
                      NS: input.ENV.NS,
                      MAC: input.ENV.MAC
                    ]];
                String finalJson = JsonOutput.toJson out;
                outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
                flowFiles << flowFile
            } else {
                log.info('else condition executed')
            }
        } catch(Exception e){}
    } as StreamCallback)
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

Ожидаемый результат:

{"ENVIRONMENT":{"DATE":"170619","PM2.5":"10","TIME":"114635","DATE_TIME":"170619","NS":"NM","MAC":"6CECEB5D0302"}}

Но если входное значение имеет PM2.5 меньше 7,2, он должен удалить этот потоковый файл.

После удаления пустого журнала исключений при попытках и отлове:

2019-10-07 12:08:29,237 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=716b08ff-7a78-3806-bdd6-99d57cfd7cb7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:248)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:162)
    at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
    at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:232)
    ... 11 common frames omitted
Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:324)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
    ... 14 common frames omitted
Caused by: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3139)
    at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3121)
    at org.apache.nifi.controller.repository.StandardProcessSession.remove(StandardProcessSession.java:1979)
    at org.apache.nifi.processor.ProcessSession$remove$3.call(Unknown Source)
    at Script107.run(Script107.groovy:40)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:321)
    ... 15 common frames omitted

Тот же кодЯ использовал для генерации одного потокового файла с несколькими выходными данными, это работает в обоих случаях 1) если он удовлетворяет условию, он сгенерирует несколько выходных данных на основе входных данных. 2) он пропадет, если не будет удовлетворен .:

Ввод:

 {
    "ENV": [{
            "MAC": "6CECEB5D0302",
            "NS": "NM",
            "DATE": "170619",
            "TIME": "114635",
            "PM2.5": "7.222410585417936",
            "PM10": "8.272681196722012"
        },
        {
            "MAC": "6CECEB5D0302",
            "NS": "NM",
            "DATE": "170619",
            "TIME": "114635",
            "PM2.5": "7.222410585417936",
            "PM10": "8.272681196722012"
        }
    ]
}

Приведенный выше ввод сгенерирует два файла потока, так как 7.22 ... больше 7.2. Если значение для PM2.5 меньше 7,2, тогда он не будет генерировать поток-файл передачи для следующего процессора, я имею в виду, что он сам там будет сброшен.

1 Ответ

0 голосов
/ 07 октября 2019

Проблема в вашем коде - вы transferring к успеху, а затем removing тот же файл потока.

Только одна из этих операций может быть применена к одному файлу.

Кажется, вы просто забыли создать новый файл

Попробуйте изменить эту часть кода:

String finalJson = JsonOutput.toJson out;
outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
flowFiles << flowFile

на это:

def outFile = session.create(flowFile) //create new flow file
session.write().withWriter("UTF-8"){w-> new JsonBuilder(out).writeTo(w) }
flowFiles << outFile

Вы можете упроститьваш код. Следующий пример работает с ExecuteGroovyScript процессор

import groovy.json.*;

def flowFile = session.get();
if (flowFile == null)return;
//read and parse input file
def data = flowFile.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }

data.ENV.each{item->
    if( (item."PM2.5" as Double) > 7.2  ){
        def outFile = flowFile.clone(false)     //clone flowfile without content
        def outData = [ENVIRONMENT: item]       //combine new json
        outFile.write("UTF-8"){w-> 
            new JsonBuilder(outData).writeTo(w) //write json as out file content
        }
        REL_SUCCESS << outFile                  //transfer to success
    }
}
flowFile.remove()
...