Apache Nifi - когда я использую SplitText для больших файлов, как я могу заставить записанные файлы записывать немедленно - PullRequest
0 голосов
/ 05 ноября 2018

Я читаю в текстовых файлах с 50 тыс. Строк данных, где каждая строка представляет собой полную запись.

Наш поток Nifi использует SplitText для обработки файла в пакетах по 1000 строк. (Это было установлено до моего времени для проблем с памятью, как мне сказали)

Возможно ли немедленное выполнение PutFile? Я хочу, чтобы файлы сразу выводили запись PutFile, а не просто стояли в очереди, ожидая обработки всех строк данных размером более 50 000+. Кажется довольно глупым делать это, если его разделяют.

Я читал документацию, но не могу найти, если это сделано специально и не настраивается.

Ценю любое руководство по документации, которое может помочь ответить / настроить мой поток.

1 Ответ

0 голосов
/ 05 ноября 2018

TL; DR Обходной путь должен использовать несколько SplitTexts, первый из которых разбивается, например, на 10 тыс. Строк, а второй - на 1000 строк. Затем первые 10 тыс. Строк будут разбиты на 10 потоковых файлов и отправлены в нисходящий поток, а вторые 10 тыс. Строк обрабатываются вторым SplitText.

РЕДАКТИРОВАТЬ : добавление другого обходного пути, скрипта Groovy, который будет использоваться в InvokeScriptedProcessor:

class GroovyProcessor implements Processor {
    def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
    def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
    def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
    def ComponentLog log

    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { null }
    String getIdentifier() { null }    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        def session1 = sessionFactory.createSession()
        def session2 = sessionFactory.createSession()
        try {
            def inFlowFile = session1.get()
            if(!inFlowFile) return
            def inputStream = session1.read(inFlowFile)
            inputStream.eachLine { line -> 
               def outFlowFile = session2.create()
               outFlowFile = session2.write(outFlowFile, {outputStream -> 
                   outputStream.write(line.bytes)
               } as OutputStreamCallback)
               session2.transfer(outFlowFile, REL_SUCCESS)
               session2.commit()
            }
            inputStream.close()
            session1.transfer(inFlowFile, REL_ORIGINAL)
            session1.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session2.rollback(true)
            session1.rollback(true)
            throw t
}}}
processor = new GroovyProcessor()

Для полноты:

Процессоры Split были разработаны для поддержки шаблона Split / Merge, и для того, чтобы объединить их позже, каждый из них должен иметь одинаковый «родительский идентификатор», а также счетчик.

Если вы отправите потоковые файлы до того, как разбьете все на части, вы не узнаете общее количество и не сможете объединить их позже. Кроме того, если что-то идет не так с разделенной обработкой, вы можете «откатить» операцию вместо того, чтобы некоторые потоковые файлы уже были в нисходящем направлении, а остальные отправлены на сбой

Чтобы отправить некоторые потоковые файлы перед любой обработкой, вы должны "зафиксировать сеанс процесса". Это препятствует выполнению вышеуказанных действий и создает разрыв в происхождении для входящего файла потока, так как вы должны зафиксировать / передать этот файл в сеансе, в котором он изначально был принят. Для всех последующих фиксаций потребуются новые файлы потока, созданные , которая разрывает цепь происхождения / происхождения.

Несмотря на то, что для этого существует открытая версия Jira ( NIFI-2878 ), в списках рассылки были некоторые несогласованности и запросы на добавление этой функции в процессоры, которые принимают ввод (т.е. процессоры, не являющиеся исходными кодами) ). Платформа NiFi довольно транзакционна, и перед ней стоит такая особенность.

...