Нифи: Как объединить поток-файл с уже существующими таблицами в каталоге? - PullRequest
0 голосов
/ 24 мая 2019

Это вопрос о Нифи.

Я создал конвейер Nifi для преобразования потокового файла в формате xml в формат csv.Теперь я хотел бы объединить или объединить преобразованный потоковый файл csv в существующие таблицы по имени файла (что также означает имя таблицы).

Проще говоря, мой процессор работает следующим образом.

  1. GetFile (из определенного каталога) -> 2. Преобразовать xml в csv -> 3. Обновите файл потока с именем таблицы -> 4. PutFile (в другой каталог)

Но в конце потока процессор PutFile выдает ошибку, говоря, что «файл с таким именем уже существует».

Я понятия не имею, как можно добавить flowfile в существующую таблицу csv.Любые советы, советы, идеи приветствуются.

Заранее спасибо.

1 Ответ

2 голосов
/ 24 мая 2019

поддержка добавления файла отсутствует, однако вы можете использовать ExecuteGroovyScript для этого:

def ff=session.get()
if(!ff)return
ff.read().withStream{s->
    String path = "./out_folder/${ff.filename}"
    //sync on file path to avoid conflict on same file writing (hope)
    synchronized(path){  
        new File( path ).append(s)
    }
}
REL_SUCCESS << ff

, если вам нужно работать с текстовым (читательским) контентом, а не с байтовым (потоковым) контентом

В следующем примере показано, как исключить 1 строку заголовка из файла потока, если файл назначения уже существует

def ff=session.get()
if(!ff)return
ff.read().withReader("UTF-8"){r->
    String path = "./.data/${ff.filename}"
    //sync on file path to avoid conflict on same file writing (hope)
    synchronized(path){
        def fout = new File( path )
        if(fout.exists())r.readLine() //skip 1 line (header) only if out file already exists
        fout.append(r) //append to the file the rest of reader content
    }
}
REL_SUCCESS << ff
...