Apache NiFi: как сравнить несколько строк в CSV и создать новый столбец - PullRequest
1 голос
/ 30 октября 2019

У меня есть CSV, который выглядит следующим образом.

Jc,TXF,timer,alpha,beta
15,44,55,12,33
18,87,33,111
9,87,61,29,77

Альфа и Бета вместе составляют код города. Я хочу добавить название города в CSV в качестве нового столбца.

Jc,TXF,timer,alpha,beta,city
15,44,55,12,33,York
18,87,33,111,London
9,87,61,29,77,Sydney

У меня есть другой CSV с только столбцами alpha,beta,city. Что выглядит следующим образом:

alpha,beta,city
12,33,York
33,111,London
29,77,Sydney

Как я могу добиться этого с помощью Apache NiFi. Пожалуйста, предложите процессоры и рабочий процесс, необходимые для достижения этой цели.

1 Ответ

2 голосов
/ 30 октября 2019

Я вижу два пути решения этой проблемы.

Сначала используйте CsvLookupService. Однако CsvLookupService поддерживает только один ключ, но у вас есть два, альфа и бета. Таким образом, чтобы использовать это решение, вы должны объединить оба ключа в один ключ, например 12_33.

Second, используя ExecuteScript процессор. Это лучше, потому что вам не нужно изменять исходные данные. Стратегия:

  1. Разделение текста CSV на строки
  2. Обогащение каждой строки столбцом города путем поиска альфа- и бета-ключей в файле сопоставления
  3. Объединениеотдельные строки в один файл CSV.

Общий поток:

enter image description here

GenerateFlowFile:

enter image description here

SplitText:

enter image description here

Установите header line count в 1, чтобы включить строку заголовка вразделить контент. Для процессора ExecuteScript установите python как scripting engine и укажите следующее script body:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import csv

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        # fetch the mapping CSV file
        with open('/home/nifi/mapping.csv', 'r') as mapping:
            # read the mapping file
            mappingContent = csv.reader(mapping, delimiter=',')
            # flowfile content is CSV text with two lines, header and actual content
            # split by newline to get access to each inidvidual line
            lines = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')
            # the result will contain the header line 
            # the result will have the additional city column
            result = lines[0] + ',city\n'
            # take the second line and split it
            # to get access to alpha, beta and city values
            lineSplit = lines[1].split(',')

            # Go through the mapping file
            # item[0] -> alpha
            # item[1] -> beta
            # item[2] -> city
            # See if you find alpha and beta on the line content
            for item in mappingContent:
                if item[0] == lineSplit[3] and item[1] == lineSplit[4]:
                    result += lines[1] + ',' + item[2]
                    break

            if result is None:
                raise Exception('No matching found.')
            else:
                outputStream.write(bytearray(result.encode('utf-8')))
# end class

flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        session.transfer(flowFile, REL_FAILURE)

См. Комментарии для подробного описания сценария. /home/nifi/mapping.csv должно быть доступно на вашем экземпляре NiFi. Если вы хотите узнать больше о процессоре ExecuteScript, обратитесь к ExecuteScript Cookbook . Наконец, вы объединяете все строки в один файл CSV:

enter image description here

Установите программу чтения и записи CSV. Оставьте свои свойства по умолчанию. Настройте свойства MergeContent, чтобы контролировать, сколько строк должно быть в каждом результирующем файле CSV. Результат:

enter image description here

...