У меня есть входная полезная нагрузка {"ENV": {"MAC": "6CECEB5D0302","NS": "NM","DATE": "170619","TIME": "114635","PM2.5": "10","PM10": "8.272681196722012"}}
.
Я использовал приведенный ниже скрипт для создания нескольких потоковых файлов, и он работал нормально. Но тот же код не работает для передачи одного потокового файла. В приведенном ниже коде, основанном на коде if
, который я хотел, скрипт должен либо передавать поток на следующий процессор, либо просто регистрировать информацию и удалять этот файл потока.
import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;
import java.lang.*;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def flowFiles = [] as List<FlowFile>
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
try{
def data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
def input = new JsonSlurper().parseText( data );
if( Double.parseDouble(input.ENV["PM2.5"])>7.2) {
Map out =
[ENVIRONMENT: [
DATE: input.ENV.DATE,
"PM2.5": input.ENV["PM2.5"],
TIME: input.ENV.TIME,
DATE_TIME: input.ENV.DATE,
NS: input.ENV.NS,
MAC: input.ENV.MAC
]];
String finalJson = JsonOutput.toJson out;
outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
flowFiles << flowFile
} else {
log.info('else condition executed')
}
} catch(Exception e){}
} as StreamCallback)
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
Ожидаемый результат:
{"ENVIRONMENT":{"DATE":"170619","PM2.5":"10","TIME":"114635","DATE_TIME":"170619","NS":"NM","MAC":"6CECEB5D0302"}}
Но если входное значение имеет PM2.5
меньше 7,2, он должен удалить этот потоковый файл.
После удаления пустого журнала исключений при попытках и отлове:
2019-10-07 12:08:29,237 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=716b08ff-7a78-3806-bdd6-99d57cfd7cb7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:248)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:162)
at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:232)
... 11 common frames omitted
Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:324)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
... 14 common frames omitted
Caused by: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3139)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3121)
at org.apache.nifi.controller.repository.StandardProcessSession.remove(StandardProcessSession.java:1979)
at org.apache.nifi.processor.ProcessSession$remove$3.call(Unknown Source)
at Script107.run(Script107.groovy:40)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:321)
... 15 common frames omitted
Тот же кодЯ использовал для генерации одного потокового файла с несколькими выходными данными, это работает в обоих случаях 1) если он удовлетворяет условию, он сгенерирует несколько выходных данных на основе входных данных. 2) он пропадет, если не будет удовлетворен .:
Ввод:
{
"ENV": [{
"MAC": "6CECEB5D0302",
"NS": "NM",
"DATE": "170619",
"TIME": "114635",
"PM2.5": "7.222410585417936",
"PM10": "8.272681196722012"
},
{
"MAC": "6CECEB5D0302",
"NS": "NM",
"DATE": "170619",
"TIME": "114635",
"PM2.5": "7.222410585417936",
"PM10": "8.272681196722012"
}
]
}
Приведенный выше ввод сгенерирует два файла потока, так как 7.22 ... больше 7.2. Если значение для PM2.5
меньше 7,2, тогда он не будет генерировать поток-файл передачи для следующего процессора, я имею в виду, что он сам там будет сброшен.