Соглашение об именах в NiFi - PullRequest
0 голосов
/ 03 ноября 2018

У меня есть поток GetFile-> ConvertRecord-> splittext-> PutdatabaseRecord . Мой CSV-файл, который я отправляю в GetFile, содержит следующие поля:

ID  TIME                      M00B01  M00B02  M00B03
1   2018-09-27 10:44:23.972   3242    35      335
2   2018-09-21 11:44:23.972   323     24      978

Скелет таблицы моей базы данных в MYSQL выглядит следующим образом:

Create table test(ID INT,TIME DATETIME(3),MxB01 INT,MxB02 INT,MxB03 INT);

Примечание: я заменил название заголовков на MxB00, MxB01 и т. Д.

У меня ошибка в процессоре convertRecord, когда я читаю как CSVReader и пишу как CSVSetWritter. Я прилагаю конфигурацию обоих для вашей справки.

Проблема в том, что он читает файл CSV, но из-за изменения имен заголовков все остальные поля отображаются как пустые (я изменил имена заголовков, потому что мне нужно записать имя заголовка как MxB00, чтобы соответствовать заголовкам, определенным в MySQL Таблица). Я получаю значения ID и Time, потому что я не изменил имя заголовка этих полей в определении таблицы CSVWritter и MySQL. поэтому те значения, которые я получаю, но для других значений я получаю пустым, потому что это становится запутанным из-за смены имени.

CSVReader

CSVSetWritter

AvroSchemaRegistry

Как я могу решить эту проблему? Любая помощь очень ценится. Спасибо!

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

Попробуйте GetFile-> ReplaceText -> ConvertRecord-> splittext-> PutdatabaseRecord.

конфиг:

Поиск значения: ввод заголовков, заменяемое значение: новые заголовки, Стратегия замены: буквальная замена, Режим оценки; Весь текст

0 голосов
/ 03 ноября 2018

Если вы хотите создать пользовательский заголовок для выходного CSV-файла , затем настройте CSV-ридер службу контроллера, как показано ниже.

Configs: enter image description here

Поскольку мы используем стратегию доступа к схеме в качестве текста схемы и задаем схему как

{
"type": "record",
"name": "SQLSchema",
"fields" : [
{"name": "ID", "type": ["null","int"]},
{"name": "TIME", "type": ["null","string"]},
{"name": "MxB01", "type": ["null","int"]},
{"name": "MxB02", "type": ["null","int"]},
{"name": "MxB03", "type": ["null","int"]}
]
}

И мы рассматриваем первую строку данных CSV как заголовок и , игнорируя имена столбцов CSV-заголовков , поэтому выходной файл потока будет иметь схему, которую мы определили выше.

Конфигурации CsvWriter:

enter image description here Поскольку мы наследуем стратегию записи схемы, выходной файл потока будет иметь тот же заголовок, который мы указали в считывателе.

Кроме того Я не уверен, почему вы используете процессор SplitText после ConvertRecord в качестве процессора PutDatabaseRecord, предназначенного для одновременной работы с кусками записей.

Даже если вы можете настроить PutDatabaseRecord с помощью службы контроллера CsvReader, упомянутой выше , тогда ваш поток будет:

Расход:

GetFile -> PutDatabaseRecord

Примечание:

Поскольку я не использовал логические типы Avro для поля метки времени, если вы используете логические типы, измените соответствующие настройки службы контроллера.

...