Я сейчас новичок в Apache Nifi и все еще изучаю его.
Я сделал специальный процессор, где я буду получать данные с сервера с нумерацией страниц.
Я передаю входной файл, который будет содержать атрибут «URL».
Наконец, перенесите ответ в файл выходного потока, так как я извлекаю данные с разбивкой на страницы, поэтому я создал новый файл выходного потока для каждой страницы и перенес его в Успешное отношение.
Ниже приведена часть кода:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incomingFlowFile = session.get();
String api = null;
if (incomingFlowFile == null) {
logger.info("empty input flow file");
session.commit();
return;
} else {
api=incomingFlowFile.getAttribute("url");
}
session.remove(incomingFlowFile);
if(api == null) {
logger.warn("API url is null");
session.commit();
return;
}
int page = Integer.parseInt(context.getProperty(PAGE).getValue());
while(page < 3) {
try {
String url = api + "&curpg=" + page;
logger.info("input url is: {}", url);
HttpResponse response = httpGetApiCall(url, 10000);
if(response == null || response.getEntity() == null) {
logger.warn("response null");
session.commit();
return;
}
String resp = EntityUtils.toString(response.getEntity());
InputStream is = new ByteArrayInputStream(StandardCharsets.UTF_16.encode(resp).array());
FlowFile outFlowFile = session.create();
outFlowFile = session.importFrom(is, outFlowFile);
session.transfer(outFlowFile, SUCCESSFUL);
} catch (IOException e) {
logger.warn("IOException :{}", e.getMessage());
return;
}
++page;
}
session.commit();
}
У меня проблема с тем, что для одного файла потока ввода этот процессор запускается дважды, и поэтому он генерирует 4 файла потока для одного файла потока ввода.
Я не могу понять, где я поступил неправильно.
Пожалуйста, помогите в этом вопросе.
Заранее спасибо.
=============================================== =======================
группа процессоров 1 (Nifi_Parvin)
группа процессоров 2 (News_Point_custom)