NIFI - процессор QueryDatabaseTable.Как запросить строки, которые изменены? - PullRequest
0 голосов
/ 18 февраля 2019

Я работаю над потоком данных NIFI, где мой вариант использования - извлечение данных таблицы mysql и помещение их в файловую систему hdfs / local.

Я создал конвейер потока данных, в котором я использовал процессор querydatabaseTable ------ ConvertRecord --- процессор putFile.

Схема моей таблицы ---> идентификатор, имя, город, дата создания

Я могу получать файлы в месте назначения, даже когда я вставляю новые записи вtable

Но, но ....

Когда я обновляю существующие строки, процессор не выбирает эти записи, похоже, что у него есть некоторые ограничения.

Мой вопрос такой,Как справиться с этим сценарием?либо любым другим процессором, либо необходимо обновить какое-либо свойство.

ПОЖАЛУЙСТА, помогите кому-нибудь @Bryan Bende enter image description here

1 Ответ

0 голосов
/ 18 февраля 2019

QueryDatabaseTable Процессор должен быть информирован о том, какие столбцы он может использовать для идентификации новых данных.

Последовательная отметка времени id или created недостаточна.

Из документации:

Столбцы максимального значения:

Список имен столбцов через запятую.Процессор будет отслеживать максимальное значение для каждого столбца, который был возвращен с момента запуска процессора.Использование нескольких столбцов подразумевает порядок в списке столбцов, и ожидается, что значения каждого столбца будут увеличиваться медленнее, чем значения предыдущих столбцов.Таким образом, использование нескольких столбцов подразумевает иерархическую структуру столбцов, которая обычно используется для разделения таблиц.Этот процессор может использоваться для извлечения только тех строк, которые были добавлены / обновлены с момента последнего поиска.Обратите внимание, что некоторые типы JDBC, такие как бит / логическое значение, не способствуют поддержанию максимального значения, поэтому столбцы этих типов не должны указываться в этом свойстве, что приведет к ошибкам во время обработки.Если столбцы не предоставлены, будут рассмотрены все строки таблицы, что может повлиять на производительность.ПРИМЕЧАНИЕ. Важно использовать согласованные имена столбцов максимального значения для данной таблицы, чтобы инкрементная выборка работала должным образом.

Если судить по схеме таблиц, нет никакого sql-способа определить, были ли данныеобновлено.

Есть много способов решить эту проблему.В вашем случае проще всего было бы переименовать столбец created в modified и установить now () при обновлении или работать со вторым столбцом отметки времени.

Например, добавлен новый столбец

| stamp_updated | timestamp | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |

.В процессоре вы используете столбец stamp_updated для определения новых данных processor properties

Не забудьте установить Maximum-value Columns для этих столбцов.

Так чтоЯ в основном говорю:

Если вы сами не можете сказать, что это новая запись в sql, nifi тоже не сможет.

...