Как извлечь и манипулировать данными в процессоре Nifi - PullRequest
0 голосов
/ 25 июня 2018

Я пытаюсь написать собственный процессор Nifi, который будет принимать содержимое файла входящего потока, выполнять некоторые математические операции с ним, а затем записывать результаты в файл исходящего потока.Есть ли способ вывести содержимое файла входящего потока в строку или что-то?Я долго искал, и это не так просто.Если бы кто-нибудь мог указать мне на хороший учебник, который имеет дело с чем-то подобным, это было бы очень признательно.

1 Ответ

0 голосов
/ 26 июня 2018

Руководство разработчика Apache NiFi очень хорошо описывает процесс создания собственного процессора.В вашем конкретном случае я бы начал с раздела Component Lifecycle и шаблона Enrich / Modify Content .Любой другой процессор, который выполняет аналогичную работу (например, ReplaceText или Base64EncodeContent ), будет хорошим примером для изучения;весь исходный код доступен на GitHub .

По сути, вам необходимо реализовать метод #onTrigger() в классе вашего процессора, прочитать содержимое потокового файла и проанализировать его в ожидаемом формате, выполнить операции, а затем повторно заполнить результирующее содержимое потокового файла.Ваш исходный код будет выглядеть примерно так:

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();
        AtomicBoolean error = new AtomicBoolean();
        AtomicReference<String> result = new AtomicReference<>(null);

        // This uses a lambda function in place of a callback for InputStreamCallback#process()
        processSession.read(flowFile, in -> {
            long start = System.nanoTime();

            // Read the flowfile content into a String
            // TODO: May need to buffer this if the content is large
            try {
                final String contents = IOUtils.toString(in, StandardCharsets.UTF_8);
                result.set(new MyMathOperationService().performSomeOperation(contents));

                long stop = System.nanoTime();
                if (getLogger().isDebugEnabled()) {
                    final long durationNanos = stop - start;
                    DecimalFormat df = new DecimalFormat("#.###");
                    getLogger().debug("Performed operation in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
                }
            } catch (Exception e) {
                error.set(true);
                getLogger().error(e.getMessage() + " Routing to failure.", e);
            }
        });

        if (error.get()) {
            processSession.transfer(flowFile, REL_FAILURE);
        } else {
            // Again, a lambda takes the place of the OutputStreamCallback#process()
            FlowFile updatedFlowFile = session.write(flowFile, (in, out) -> {
                final String resultString = result.get();
                final byte[] resultBytes = resultString.getBytes(StandardCharsets.UTF_8);

                // TODO: This can use a while loop for performance
                out.write(resultBytes, 0, resultBytes.length);
                out.flush();
            });
            processSession.transfer(updatedFlowFile, REL_SUCCESS);
        }
    }

Даггетт прав, что процессор ExecuteScript является хорошим местом для запуска, поскольку он сократит жизненный цикл разработки (без создания NAR, развертыванияи перезапустите NiFi, чтобы использовать его), и когда у вас будет правильное поведение, вы можете легко скопировать / вставить в сгенерированный скелет и развернуть его один раз.

...