Извлечение базы данных с NiFi - PullRequest
       24

Извлечение базы данных с NiFi

0 голосов
/ 26 сентября 2018

Я хочу извлечь все записи из моей таблицы 'nifitest' в моей базе данных SQL, называемой клиентами.Я прилагаю шаблон для вашей помощи, ребята.Я сталкиваюсь со следующими проблемами: После запуска процессора 'QueryDatabaseTable' я получаю потоковые файлы в очереди, но когда я очищаю очередь и пытаюсь перезапустить процессор 'QueryDatabaseTable', я не могу получить записи в очереди.Как я могу решить эту проблему?

Вся идея потока состоит в том, чтобы извлечь записи из базы данных mySql и сохранить файл локально на моем рабочем столе.

Я использовал шаблон на следующем веб-сайте, чтобыдобиться этого и изменил мое имя таблицы и службу пула соединений с базой данных в соответствии с моим требованием: -

https://www.batchiq.com/database-extract-with-nifi.html

Кроме того, после слияния я получаю файл как тип файла.Какой процессор использовать для хранения объединенного файла в виде файла CSV Тип (Все записи должны храниться как Test.csv вместо Тип файла) Я использовал процессор UpdateAttribute и добавлял свойство 'имя файла ' и значение как' Test.csv ', но в выходном файле я вижу только одну запись, а не все записи.Как это исправить?

Спасибо!

1 Ответ

0 голосов
/ 26 сентября 2018

QueryDatabaseTable будет сохранять состояние при запуске и извлекает only the incremental records, которые были добавлены в таблицу при повторном запуске.

Чтобы получить все записииз таблицы необходимо очистить состояние процессора , тогда только вы сможете получить все записи из таблицы.

Как очистить состояние QueryDatabaseTableпроцессор?

 1. Stop Query Database processor //make sure no threads are running at top right corner

 2. Right Click on the processor

 3. Go to View state tab

 4. Click on clear state //this will clear all the stored state in the processor

enter image description here См. эту ссылку для получения дополнительной информации об очистке состояния процессора таблицы QueryDatabase

-> Другая проблема связана с процессором MergeContent , поскольку

Минимальное количество записей установлено на 100 , поэтому процессор будетдождитесь, пока в очереди не окажется 100 потоковых файлов, прежде чем процессор MergeContent.

Используйте Свойство Max Bin Age значение до 1 мин ..etc, чтобы процессор принудительно объединялflowfiles и перенос объединенного потока в Merged relationship.

См. эту ссылку для более подробной информации об использовании / конфигурации процессора MergeContent.

ОБНОВЛЕНИЕ:

1.

Если вы хотите преобразовать окончательный формат файла в csv , тогда сохраните в L путь к осям.

Затем вы можете использовать ConvertRecord Процессор после QueryDatabaseTable Нет необходимости преобразовывать Avro --> Json --> Csv

Конфигурировать ConvertRecord процессор как Avro Reader и CsvRecordSetWriter службы контроллера, затем процессор считывает данные Avro, затем преобразует их в формат Csv.

Чтобы изменить имя файла потока, используйте UpdateAttribute процессор добавляет желаемое имя файла как новое свойство процессора .

Поток:

  - QueryDatabaseTable //get data from source in avro format 
  - ConvertRecord //convert Avro format to CSV format
  - UpdateAttribute //change filename
  - PutFile //store the csv file into local

2.

Если вас беспокоит, что данные хранятся в виде одной строки в выходном поточном файле, то настройте процесс слияния контента.r как

Delimiter Strategy Text

Demarcator shift+enter

См. эту ссылку для получения более подробной информации об этих конфигурациях стратегии разделителя содержимого слияния.

с помощью этих конфигов мы не меняем форматданные (данные будут все еще в json), но мы добавляем новую строку после каждой записи json.

...