Руководство разработчика Apache NiFi очень хорошо описывает процесс создания собственного процессора.В вашем конкретном случае я бы начал с раздела Component Lifecycle и шаблона Enrich / Modify Content .Любой другой процессор, который выполняет аналогичную работу (например, ReplaceText или Base64EncodeContent ), будет хорошим примером для изучения;весь исходный код доступен на GitHub .
По сути, вам необходимо реализовать метод #onTrigger()
в классе вашего процессора, прочитать содержимое потокового файла и проанализировать его в ожидаемом формате, выполнить операции, а затем повторно заполнить результирующее содержимое потокового файла.Ваш исходный код будет выглядеть примерно так:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
AtomicBoolean error = new AtomicBoolean();
AtomicReference<String> result = new AtomicReference<>(null);
// This uses a lambda function in place of a callback for InputStreamCallback#process()
processSession.read(flowFile, in -> {
long start = System.nanoTime();
// Read the flowfile content into a String
// TODO: May need to buffer this if the content is large
try {
final String contents = IOUtils.toString(in, StandardCharsets.UTF_8);
result.set(new MyMathOperationService().performSomeOperation(contents));
long stop = System.nanoTime();
if (getLogger().isDebugEnabled()) {
final long durationNanos = stop - start;
DecimalFormat df = new DecimalFormat("#.###");
getLogger().debug("Performed operation in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
}
} catch (Exception e) {
error.set(true);
getLogger().error(e.getMessage() + " Routing to failure.", e);
}
});
if (error.get()) {
processSession.transfer(flowFile, REL_FAILURE);
} else {
// Again, a lambda takes the place of the OutputStreamCallback#process()
FlowFile updatedFlowFile = session.write(flowFile, (in, out) -> {
final String resultString = result.get();
final byte[] resultBytes = resultString.getBytes(StandardCharsets.UTF_8);
// TODO: This can use a while loop for performance
out.write(resultBytes, 0, resultBytes.length);
out.flush();
});
processSession.transfer(updatedFlowFile, REL_SUCCESS);
}
}
Даггетт прав, что процессор ExecuteScript является хорошим местом для запуска, поскольку он сократит жизненный цикл разработки (без создания NAR, развертыванияи перезапустите NiFi, чтобы использовать его), и когда у вас будет правильное поведение, вы можете легко скопировать / вставить в сгенерированный скелет и развернуть его один раз.