Apache NiFi: обработка нескольких CSV с использованием процессора ExecuteScript - PullRequest
1 голос
/ 05 ноября 2019

У меня есть CSV с 70 столбцами. 60-й столбец содержит значение, которое определяет, является ли запись valid или invalid. Если в 60-м столбце есть 0, 1, 6 или 7, то это valid. Если он содержит любое другое значение, то его invalid.

Я понял, что эта функциональность была невозможна, полностью полагаясь на изменение свойств процессоров в Apache NiFi. Поэтому я решил использовать executeScript processor и добавил этот код Python в качестве текста.

import csv

valid =0
invalid =0
total =0
file2 = open("invalid.csv","w")
file1 = open("valid.csv","w")

with  open('/Users/himsaragallage/Desktop/redder/NFV1MSS_Hutch_2019101812750.dat.csv') as f:
    r = csv.reader(f)
    for row in f:
        # print row[1]
        total +=1

        if row[59] == "0" or row[59] == "1" or row[59] == "6" or row[59] == "7":
            valid +=1
            file1.write(row)
        else:
            invalid += 1
            file2.write(row)
file1.close()
file2.close()
print("Total : " + str(total))
print("Valid : " + str(valid))
print("Invalid : " + str(invalid))

Я не знаю, как использовать сеанс и код в процессоре executeScript, как показано в thisвопрос . Поэтому я просто написал простой код на Python и направил действительные и недействительные данные в разные файлы. Этот подход, который я использовал, имеет много ограничений .

  1. Я хочу иметь возможность динамически обрабатывать csv с разными именами файлов.
  2. csv, для которого недопустимы данныеотправлено, также должно иметь то же имя файла, что и входные данные csv.
  3. В моей папке redder будет около 20 csv. Все они должны быть обработаны за один раз.

Надеюсь, вы могли бы предложить мне способ сделать следующее. Не стесняйтесь предложить мне решение, отредактировав код Python, который я использовал, или даже полностью используя другой набор процессоров и полностью исключив использование ExecuteScript Processer

Ответы [ 2 ]

2 голосов
/ 06 ноября 2019

Вот полные пошаговые инструкции по , как использовать QueryRecord процессор

В основном вам необходимо настроить выделенные свойства

enter image description here

1 голос
/ 07 ноября 2019

Вы хотите направить записи на основе значений из одного столбца. Есть разные способы сделать это в NiFi. Я могу думать о следующем:

  • Использование QueryRecord процессор для разделения записей по значениям столбцов
  • Использование RouteOnContent процессор для маршрутизации с использованием регулярного выражения
  • Использование ExecuteScript процессор для создания пользовательской логики маршрутизации
  • Использование PartitionRecord процессор для маршрутизации на основе RecordPaths

Я покажу вам, как решить вашу проблему, используя PartitionRecordпроцессор. Поскольку вы не предоставили никаких примеров данных, я создал пример варианта использования. Я хочу отличить города в Европе от городов в других местах. Даны следующие данные:

id,city,country
1,Berlin,Germany
2,Paris,France
3,New York,USA
4,Frankfurt,Germany

Поток:

enter image description here

GenerateFlowFile:

enter image description here

PartitionRecord:

enter image description here

CSVReader должен быть настроен на вывод схемы и CSVRecordSetWriter для наследования схемы. PartitionRecord сгруппирует записи по странам и передаст их вместе с атрибутом country, который имеет значение страны. Вы увидите следующие группы записей:

id,city,country
1,Berlin,Germany
4,Frankfurt,Germany

id,city,country
2,Paris,France

id,city,country
3,New York,USA

Каждая группа является потоковым файлом и будет иметь атрибут страны, который вы будете использовать для маршрутизации групп.

RouteOnAttribute:

enter image description here

Все страны Европы будут перенаправлены на отношения is_europe. Теперь вы можете применить ту же стратегию к вашему варианту использования.

...