Звучит так, как будто этот размер пакета является необходимым числом входящих потоковых файлов до CustomProcessor
, поэтому почему бы не написать CustomProcessor#onTrigger()
следующим образом:
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
// Try to get n flowfiles from incoming queue
final Integer desiredFlowfileCount = context.getProperty(batchSize).asInteger();
final int queuedFlowfileCount = session.getQueueSize().getObjectCount();
if (queuedFlowfileCount < desiredFlowfileCount) {
// There are not yet n flowfiles queued up, so don't try to run again immediately
if (logger.isDebugEnabled()) {
logger.debug("Only {} flowfiles queued; waiting for {}", new Object[]{queuedFlowfileCount, desiredFlowfileCount});
}
context.yield();
return;
}
// If we're here, we do have at least n queued flowfiles
List<FlowFile> flowfiles = session.get(desiredFlowfileCount);
try {
// TODO: Perform work on all flowfiles
flowfiles = flowfiles.stream().map(f -> session.putAttribute(f, "timestamp", "my static timestamp value")).collect(Collectors.toList());
session.transfer(flowfiles, REL_SUCCESS);
// If extending AbstractProcessor, this is handled for you and you don't have to explicitly commit
session.commit();
} catch (Exception e) {
logger.error("Helpful error message");
if (logger.isDebugEnabled()) {
logger.error("Further stacktrace: ", e);
}
// Penalize the flowfiles if appropriate (also done for you if extending AbstractProcessor and an exception is thrown from this method
session.rollback(true);
// --- OR ---
// Transfer to failure if they can't be retried
session.transfer(flowfiles, REL_FAILURE);
}
}
Синтаксис Java 8 stream
можно заменить этим, если он незнаком:
for (int i = 0; i < flowfiles.size(); i++) {
// Write the same timestamp value onto all flowfiles
FlowFile f = flowfiles.get(i);
flowfiles.set(i, session.putAttribute(f, "timestamp", "my timestamp value"));
}
Семантика между штрафовкой (сообщающей процессору о задержке выполнения работы над конкретным потоковым файлом) и уступкой (говорящей процессору о необходимости подождать некоторое время, чтобы попытаться выполнить какую-либо работу снова) важна.
Возможно, вы также хотите, чтобы на вашем пользовательском процессоре была аннотация @TriggerSerially
, чтобы убедиться, что у вас не запущено несколько потоков, так что может возникнуть состояние состязания.