Пользовательский процессор NiFi - чтение базы данных - PullRequest
0 голосов
/ 08 июня 2019

Я новичок в NiFi и разрабатываю собственный процессор для получения самых последних данных из представления базы данных psql.Я могу получить представление базы данных с помощью приведенного ниже кода при инициализации пользовательского процессора.

private void GetData(){
    Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
    Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
    ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
    while(rs.next()){
        //Get data from database
    }
    connection.close();
}

Однако я изо всех сил пытаюсь получить последние обновления из представления базы данных.Основная проблема - когда новая запись добавляется в базу данных.Так как база данных запрашивается при инициализации процессора, пользовательский процессор не будет иметь новую запись.

Я попытался реализовать запрос в public void onTrigger () function;однако это приведет к резервному копированию канала, поскольку он будет запрашивать базу данных по каждому потоковому файлу (что не идеально, если в секунду поступают тысячи потоковых файлов).

Есть ли способ запроса базы данныхпри запуске процессора;не запрашивая базу данных для каждого потока файла?В качестве альтернативы, можно ли будет определить, была ли база данных изменена, и получить данные о модификации?Или даже установить таймер для извлечения базы данных из пользовательского процессора?

Любая помощь очень ценится, спасибо заранее.

1 Ответ

1 голос
/ 10 июня 2019

Я думаю, что если вы сможете объяснить немного больше о вашем высокоуровневом сценарии использования, это может помочь вам найти решения, потому что это кажется необычным подходом.Обычно каждый процессор несет одну ответственность, поэтому некоторые процессоры взаимодействуют с базой данных и затем выводят необходимую информацию для использования другими.

Есть несколько LookupService с, которые могут быть хорошими примерами для изучения, например MongoDBLookupService.

Если ваш вариант использования на самом деле «У меня есть собственный процессор, который принимает потоковые файлы, содержащие произвольные данные, и мне нужно выполнить с ними некоторые операции, используя последние данные из этой таблицы базы данных», у вас есть несколько вариантов:

  1. Выполните запрос к базе данных с помощью метода, аналогичного описанному выше, и вызовите этот метод один раз в течение onEnabled(), чтобы получить большинство данных из таблицы, затем вызывайте его через регулярный интервалиспользуя поток, чтобы оставаться в курсе и сохранять результаты локально в поле.При запуске метода onTrigger() используйте результаты локального кэша, а не вызов базы данных.Это уменьшит задержку и даст вам данные почти в реальном времени.Обязательно очистите обработчик потока и локальное состояние с помощью метода с аннотацией @OnStopped.
  2. Выполнить запрос к базе данных, встроенный в обработку потокового файла (т. Е. onTrigger()).Это может привести к высокой задержке и блокировке пропускной способности.Вы можете потенциально увеличить количество потоковых файлов, обрабатываемых в каждом цикле выполнения, если они могут быть обработаны в пакетном режиме с помощью List<FlowFile> flowfiles = session.get(1000); (число настраивается).
  3. Если нет изменений upserts / in-place (т.е. любое изменение в таблице базы данных приведет к новым строкам), вы можете использовать дозорный запрос (SELECT COUNT(*) FROM table;) для возвратаколичество строк, сравните его с числом ранее возвращенных строк и выполняйте «дорогой» запрос, извлекая все данные, если эти числа различаются.В этом случае вы можете получить только дельта-строки, записав максимальный идентификатор или временную метку ранее полученных строк.Если возможны аппсеты, может пригодиться что-то вроде SELECT MAX(lastModified) AS mostRecentTimeModified FROM table;.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...