Доступ к происхождению файла потока NiFi - PullRequest
0 голосов
/ 29 ноября 2018

Я разрабатываю какую-то обработку ошибок для потоковых файлов для NiFi, например, подсистема базы данных отказывается записывать данные из потокового файла, потому что данные не такие, как ожидалось, потому что исходная система этих данныхотсутствуют некоторые основные данные.
Таким образом, эта обработка ошибок записывает данные в MongoDB с дополнительной информацией о том, что пошло не так.Для этого я написал InvokeScriptedProcessor со скриптом Groovy, чтобы добиться этого.

Вот важная часть скрипта:

ArrayList getStacktrace(flowfileUuid){
    def lineage = this.provenanceRepository.createLineageQuery(flowfileUuid)
    def lineageData = this.provenanceRepository.getLineageData(lineage.id)

    if (lineageData.results == null || lineageData.results.nodes.size() == 0){
        println "cannot find stacktrace for ${flowfileUuid}."
        return []
    }
    def eventIds = lineageData.results.nodes.findAll {n -> n.type == 'EVENT'}.collect {n -> n.id }.sort()
    def provenanceEvents = []
    for (eventId in eventIds){
        provenanceEvents << this.provenanceRepository.getProvenanceEvent(eventId).provenanceEvent.componentName
    }
    this.provenanceRepository.deleteLineageQuery(lineage.id)
    return provenanceEvents
}

Для createLineageQuery Я POSTING к nifi-api с /nifi-api/provenance/lineage добавлением uuid изфайл потока в теле.Результатом является, среди прочего, ID запроса.Я использую это ID до getLineageData;есть также свойство finished, и я жду, пока запрос не будет завершен.
С этими данными линии я getProvenanceEvent данных и записываю имя компонента (процессора) в массив.
После этогоI deleteLineageQuery, как указано в документации.

Так что это будет моя трассировка стека.

Теперь проблема в том, что данные о происхождении пустые, когда файл потока впервые достигает этого InvokeScriptedProcessor.Я перепробовал много вещей, таких как ожидание и прочее.Не помогает.
Теперь странно то, что данные о происхождении не пустые, когда я повторяю файл потока для этого процессора.
Таким образом, поведение не является детерминированным, как я ожидаю.
Иногда данные о происхождении не пустые, когда я впервые обрабатываю файл потока.
Я тоже пробовал это сделать с Fiddler, там он работал все время.

Есть ли проблема смой подход?
Я в настоящее время использую NiFi 1.6.0.

РЕДАКТИРОВАТЬ:
Я приму ответ Брайана в качестве решения.
Я буду исследоватьэто как только у меня есть время, но звучит правильно.Тем не менее, я попробовал свое решение с NiFi 1.8.0, и оно работает как задумано.Так что в настоящее время я в порядке с тем, как я реализовал это на первом этапе, но я улучшу свое решение с предложением Брайана.

1 Ответ

0 голосов
/ 29 ноября 2018

Я не совсем уверен, в чем проблема, но в общем случае данные о происхождении на самом деле не предназначены для доступа к процессору, поэтому в сеансе или в контексте отсутствует API, позволяющий получать события провенанса,разрешено только создание событий.

Чтобы выполнить запрос о происхождении, события должны быть проиндексированы, и нет никаких гарантий относительно того, когда произойдет индексация, связанная с обработкой файла потока.Таким образом, возможно, что события еще не видны.

ReportingTask - это предполагаемый способ доступа к событиям происхождения, и его можно использовать для передачи их из NiFi в какую-либо внешнюю систему для более длительного хранения.

...