TL; DR Обходной путь должен использовать несколько SplitTexts, первый из которых разбивается, например, на 10 тыс. Строк, а второй - на 1000 строк. Затем первые 10 тыс. Строк будут разбиты на 10 потоковых файлов и отправлены в нисходящий поток, а вторые 10 тыс. Строк обрабатываются вторым SplitText.
РЕДАКТИРОВАТЬ : добавление другого обходного пути, скрипта Groovy, который будет использоваться в InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
Для полноты:
Процессоры Split были разработаны для поддержки шаблона Split / Merge, и для того, чтобы объединить их позже, каждый из них должен иметь одинаковый «родительский идентификатор», а также счетчик.
Если вы отправите потоковые файлы до того, как разбьете все на части, вы не узнаете общее количество и не сможете объединить их позже. Кроме того, если что-то идет не так с разделенной обработкой, вы можете «откатить» операцию вместо того, чтобы некоторые потоковые файлы уже были в нисходящем направлении, а остальные отправлены на сбой
Чтобы отправить некоторые потоковые файлы перед любой обработкой, вы должны "зафиксировать сеанс процесса". Это препятствует выполнению вышеуказанных действий и создает разрыв в происхождении для входящего файла потока, так как вы должны зафиксировать / передать этот файл в сеансе, в котором он изначально был принят. Для всех последующих фиксаций потребуются новые файлы потока, созданные , которая разрывает цепь происхождения / происхождения.
Несмотря на то, что для этого существует открытая версия Jira ( NIFI-2878 ), в списках рассылки были некоторые несогласованности и запросы на добавление этой функции в процессоры, которые принимают ввод (т.е. процессоры, не являющиеся исходными кодами) ). Платформа NiFi довольно транзакционна, и перед ней стоит такая особенность.