Я разрабатываю какую-то обработку ошибок для потоковых файлов для NiFi, например, подсистема базы данных отказывается записывать данные из потокового файла, потому что данные не такие, как ожидалось, потому что исходная система этих данныхотсутствуют некоторые основные данные.
Таким образом, эта обработка ошибок записывает данные в MongoDB с дополнительной информацией о том, что пошло не так.Для этого я написал InvokeScriptedProcessor со скриптом Groovy, чтобы добиться этого.
Вот важная часть скрипта:
ArrayList getStacktrace(flowfileUuid){
def lineage = this.provenanceRepository.createLineageQuery(flowfileUuid)
def lineageData = this.provenanceRepository.getLineageData(lineage.id)
if (lineageData.results == null || lineageData.results.nodes.size() == 0){
println "cannot find stacktrace for ${flowfileUuid}."
return []
}
def eventIds = lineageData.results.nodes.findAll {n -> n.type == 'EVENT'}.collect {n -> n.id }.sort()
def provenanceEvents = []
for (eventId in eventIds){
provenanceEvents << this.provenanceRepository.getProvenanceEvent(eventId).provenanceEvent.componentName
}
this.provenanceRepository.deleteLineageQuery(lineage.id)
return provenanceEvents
}
Для createLineageQuery
Я POSTING
к nifi-api с /nifi-api/provenance/lineage
добавлением uuid
изфайл потока в теле.Результатом является, среди прочего, ID
запроса.Я использую это ID
до getLineageData
;есть также свойство finished
, и я жду, пока запрос не будет завершен.
С этими данными линии я getProvenanceEvent
данных и записываю имя компонента (процессора) в массив.
После этогоI deleteLineageQuery
, как указано в документации.
Так что это будет моя трассировка стека.
Теперь проблема в том, что данные о происхождении пустые, когда файл потока впервые достигает этого InvokeScriptedProcessor
.Я перепробовал много вещей, таких как ожидание и прочее.Не помогает.
Теперь странно то, что данные о происхождении не пустые, когда я повторяю файл потока для этого процессора.
Таким образом, поведение не является детерминированным, как я ожидаю.
Иногда данные о происхождении не пустые, когда я впервые обрабатываю файл потока.
Я тоже пробовал это сделать с Fiddler, там он работал все время.
Есть ли проблема смой подход?
Я в настоящее время использую NiFi 1.6.0
.
РЕДАКТИРОВАТЬ:
Я приму ответ Брайана в качестве решения.
Я буду исследоватьэто как только у меня есть время, но звучит правильно.Тем не менее, я попробовал свое решение с NiFi 1.8.0
, и оно работает как задумано.Так что в настоящее время я в порядке с тем, как я реализовал это на первом этапе, но я улучшу свое решение с предложением Брайана.