Процессор Nifi запускается дважды для одного файла потока ввода - PullRequest
0 голосов
/ 16 января 2019

Я сейчас новичок в Apache Nifi и все еще изучаю его. Я сделал специальный процессор, где я буду получать данные с сервера с нумерацией страниц. Я передаю входной файл, который будет содержать атрибут «URL». Наконец, перенесите ответ в файл выходного потока, так как я извлекаю данные с разбивкой на страницы, поэтому я создал новый файл выходного потока для каждой страницы и перенес его в Успешное отношение. Ниже приведена часть кода:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile incomingFlowFile = session.get();
    String api = null;
    if (incomingFlowFile == null) {
        logger.info("empty input flow file");
        session.commit();
        return;
    } else {
       api=incomingFlowFile.getAttribute("url");
    }

    session.remove(incomingFlowFile);
    if(api == null) {
        logger.warn("API url is null");
        session.commit();
        return;
    }

    int page = Integer.parseInt(context.getProperty(PAGE).getValue());

    while(page < 3) {
        try {
            String url = api + "&curpg=" + page;
            logger.info("input url is: {}", url);
            HttpResponse response = httpGetApiCall(url, 10000);
            if(response == null || response.getEntity() == null) {
                logger.warn("response null");
                session.commit();
                return;
            }

            String resp = EntityUtils.toString(response.getEntity());

            InputStream is = new ByteArrayInputStream(StandardCharsets.UTF_16.encode(resp).array());
            FlowFile outFlowFile = session.create();
            outFlowFile = session.importFrom(is, outFlowFile);
        session.transfer(outFlowFile, SUCCESSFUL);

        } catch (IOException e) {
            logger.warn("IOException :{}", e.getMessage());
            return;
        }
        ++page;
    }
    session.commit();
}

У меня проблема с тем, что для одного файла потока ввода этот процессор запускается дважды, и поэтому он генерирует 4 файла потока для одного файла потока ввода.

Я не могу понять, где я поступил неправильно. Пожалуйста, помогите в этом вопросе. Заранее спасибо.

=============================================== ======================= группа процессоров 1 (Nifi_Parvin)

группа процессоров 2 (News_Point_custom)

...