Пакетная обработка потоковых файлов в apache nifi - PullRequest
1 голос
/ 26 апреля 2019

Я написал собственный процессор nifi, который пытается пакетно обрабатывать файлы входного потока.

Однако, похоже, он не ведет себя так, как ожидалось.Вот что происходит:

Я копирую и вставляю некоторые файлы на сервер.FethFromServerProcessor получает эти файлы с сервера и помещает их в queue1.MyCustomProcessor читает файлы в пакетном режиме из queue1.У меня есть свойство batchSize, определенное для MyCustomProcessor, и внутри его метода onTrigger() я получаю все файлы потока из queue1 в текущем пакете, выполнив следующее:

session.get(context.getProperty(batchSize).asInteger())

Первая строка onTrigger() создает временную метку и добавляет эту временную метку ко всем файлам потока.Таким образом, все файлы в пакете должны иметь одну и ту же метку времени.Однако этого не происходит.Обычно первый файл потока получает одну временную метку, а остальные файлы потока получают другую временную метку.

Кажется, что когда FetchFromServerProcessor получает первый файл с сервера и помещает его в queue1, MyCustomProcessorсрабатывает и получает все файлы из очереди.Между прочим, бывает, что раньше был один файл, который в этом пакете выбран как единственный файл.К тому времени, когда MyCustomProcessor обработал этот файл, FetchFromServerProcessor извлек все файлы с сервера и поместил их в queue1.Таким образом, после обработки первого файла, MyCustomProcessor берет все файлы в queue1 и формирует второй пакет, тогда как я хочу, чтобы все файлы собирались в один пакет.

Как можно избежать формирования двух пакетов?Я вижу, что люди обсуждают ожидание-уведомление в этом контексте: 1 , 2 .Но я не могу быстро разобраться в этих постах.Может ли кто-нибудь дать мне минимальные шаги для достижения этого с помощью процессоров ожидания уведомлений или кто-то может указать мне на минимальный учебник, который дает пошаговую процедуру для использования процессоров ожидания уведомлений?Кроме того, объяснил ли я стандартный шаблон ожидания-уведомления для решения проблемы, связанной с партиями?Или есть какой-то другой стандартный подход, чтобы сделать это?

1 Ответ

0 голосов
/ 27 апреля 2019

Звучит так, как будто этот размер пакета является необходимым числом входящих потоковых файлов до CustomProcessor, поэтому почему бы не написать CustomProcessor#onTrigger() следующим образом:

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ComponentLog logger = getLogger();
    // Try to get n flowfiles from incoming queue
    final Integer desiredFlowfileCount = context.getProperty(batchSize).asInteger();
    final int queuedFlowfileCount = session.getQueueSize().getObjectCount();
    if (queuedFlowfileCount < desiredFlowfileCount) {
        // There are not yet n flowfiles queued up, so don't try to run again immediately
        if (logger.isDebugEnabled()) {
            logger.debug("Only {} flowfiles queued; waiting for {}", new Object[]{queuedFlowfileCount, desiredFlowfileCount});
        }
        context.yield();
        return;
    }

    // If we're here, we do have at least n queued flowfiles
    List<FlowFile> flowfiles = session.get(desiredFlowfileCount);

    try {
        // TODO: Perform work on all flowfiles
        flowfiles = flowfiles.stream().map(f -> session.putAttribute(f, "timestamp", "my static timestamp value")).collect(Collectors.toList());
        session.transfer(flowfiles, REL_SUCCESS);

        // If extending AbstractProcessor, this is handled for you and you don't have to explicitly commit
        session.commit();
    } catch (Exception e) {
        logger.error("Helpful error message");
        if (logger.isDebugEnabled()) {
            logger.error("Further stacktrace: ", e);
        }
        // Penalize the flowfiles if appropriate (also done for you if extending AbstractProcessor and an exception is thrown from this method
        session.rollback(true);
        //  --- OR ---
        // Transfer to failure if they can't be retried
        session.transfer(flowfiles, REL_FAILURE);
    }
}

Синтаксис Java 8 stream можно заменить этим, если он незнаком:

        for (int i = 0; i < flowfiles.size(); i++) {
            // Write the same timestamp value onto all flowfiles
            FlowFile f = flowfiles.get(i);
            flowfiles.set(i, session.putAttribute(f, "timestamp", "my timestamp value"));
        }

Семантика между штрафовкой (сообщающей процессору о задержке выполнения работы над конкретным потоковым файлом) и уступкой (говорящей процессору о необходимости подождать некоторое время, чтобы попытаться выполнить какую-либо работу снова) важна.

Возможно, вы также хотите, чтобы на вашем пользовательском процессоре была аннотация @TriggerSerially , чтобы убедиться, что у вас не запущено несколько потоков, так что может возникнуть состояние состязания.

...