Как разобрать / извлечь имя атрибута и его значение с помощью NiFi - PullRequest
0 голосов
/ 27 декабря 2018

поэтому у меня есть файл в следующем формате:

{"Field_1": 0.01, "abc_id": 5, "pqr_id": "0", "xyz_id": 946715026, "count":130, «Epoch»: «130723413», «измерение»: «Град / с»} {«Field_2»: 0,01, «abc_id»: 2, «pqr_id»: «5», «xyz_id»: 841712547, «count»: 190, «Эпоха»: «130723414», «измерение»: «м / с2»}

-

-

-

-

-

..... и т. Д. У меня есть несколько строк с именем атрибута: values.

Какой процессор использовать для разделения атрибута в виде полей / столбцов и анализаего соответствующие значения.

Я хочу проанализировать файл таким образом, чтобы можно было извлекать имена полей и значения отдельно, поскольку моя конечная цель - передать данные файла в MySQL (схема таблицы в MySQL определена ниже :)

Field_1, Field_2, abc_id, pqr_id, xyz_id, count, Epoch, измерение

Как мне достичь полного варианта использования в NiFi?Я думаю, что могу достичь этого, используя какое-то регулярное выражение, но я не уверен, какое регулярное выражение поможет мне получить желаемый результат.Любое предложение высоко ценится.Спасибо!

1 Ответ

0 голосов
/ 14 января 2019

Это можно сделать с помощью потоковых процессоров ConvertJSONToSQL и PutSQL.Для анализа файла JSON не требуется никаких дополнительных действий.

Процессор ConvertJSONToSQL преобразует FlowFile в формате JSON в инструкцию SQL UPDATE, INSERT или DELETE и не вставляет данные непосредственно в базу данных.Ожидается, что входящий FlowFile будет плоским сообщением JSON, то есть он состоит из одного элемента JSON, и каждое поле отображается в простой тип.После успешного преобразования исходный FlowFile направляется в отношение «оригинал», а SQL направляется в отношение «sql».

PutSQL выполняет команду SQL UPDATE или INSERT.Предполагается, что содержимое входящего FlowFile будет командой SQL для выполнения.Этот процессор фактически вставляет данные в базу данных;

Ваш FlowFile:

{"Field_1":0.01,"abc_id":5,"pqr_id":"0","xyz_id":946715026,"count":130,"Epoch":"130723413","measurement":"Grad/s"} 

Целевая таблица должна быть доступна в целевой базе данных;Тестовая настройка использует базу данных «tempdb» и таблицу «test_db».

Оператор DDL таблицы:

create table test_db (Field_1 double, abc_id int, pqr_id varchar(5), xyz_id int(11), count int, Epoch varchar(15), measurement varchar(30));

Также необходимо создать службу контроллера DBCPConnectionPool, которая обеспечивает подключение к базе данных.служба объединения в пул.

Прикрепление примеров снимков экрана:

Поток:

Nifi flow

DBCPConnectionPool:

Обратите внимание, что URL-адрес подключения имеет целевое имя базы данных.

Controller service setting

ConvertJSONToSQL Processor: enter image description here

Процессором GetFile может быть любой другой процессор (ы) / поток, который предоставляет файл выходного потока в формате JSON.

...