NiFi создает индексы после вставки записей в таблицу - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть первая группа процессов, которая удаляет индексы в таблице.Затем этот маршрут переходит к другой группе процессов, которую он вставляет в таблицу.После успешной вставки полмиллиона строк я хочу создать индексы для таблицы и проанализировать их.Это типичная методология хранилища данных.Может кто-нибудь дать совет, как это сделать?

Я пытался установить счетчики, но не могу ссылаться на счетчики на языке выражений.Я пробовал RouteOnAttribute, но ничего не получилось.Сейчас я копаюсь в процессорах ожидания и оповещения - может быть, там есть какое-то решение ??

Я получил счетчики для подсчета операторов вставки sql потокового файла, но не может ссылаться на значения счетчика через язык выражений.Т.е. это всегда возвращает нули: "$ {InsertCounter}", где InsertCounter устанавливается правильно, он появляется через мой процесс UpdateCounter в моем потоке.

Так, может быть, этот код можно использовать?

В процессоре ожидания установите для параметра Количество целевых сигналов значение $ {фрагмент.count}.

Установите идентификатор сигнала освобождения в обоихпроцессор уведомлений и ожидания $ {фрагмент.идентификатор}

ничего не работает

Ответы [ 2 ]

1 голос
/ 03 апреля 2019

Для этого вы можете использовать процессоры Wait / Notify. Я полагаю, вы используете ExecuteSQL, SplitAvro? Если это так, поток будет выглядеть так:

Сплит подход

Во 2-й группе процессов

  • ExecuteSQL: например, 1 выходной FlowFile, содержащий 5000 записей
  • SpritAvro: создает 5000 FlowFiles, этот процессор добавляет атрибуты fragment.identifier и fragment.count (= 5000).
    • раскол:
      • XXXX: выполнить некоторые преобразования для каждой записи
      • PutSQL: вставка записей по отдельности
      • Уведомление: увеличение счетчика для fragment.identifier (идентификатора сигнала разблокировки) в 1. Выполнено 5000 раз.
    • оригинал - к следующей группе процессов

В 3-й группе процессов

  • Ожидание: ожидание, пока fragment.identifier (идентификатор сигнала разблокировки) достигнет fragment.count (количество целевых сигналов). Этот маршрут обрабатывает original FlowFile, поэтому выполняется только один раз.
  • PutSQL: выполнить запрос для создания индексов и анализа таблиц

В качестве альтернативы, если это возможно, использование процессоров с поддержкой записи сделало бы поток проще и эффективнее.

Запись приближения

  • ExecuteSQL: например, 1 выходной FlowFile, содержащий 5000 записей
  • Выполнение преобразования уровня записи: с помощью UpdateRecord или LookupRecord вы можете выполнять обработку данных, не разбивая записи на несколько FlowFiles.
  • PutSQL: выполнить запрос для создания индексов и анализа таблиц. Поскольку единственный FlowFile, содержащий все записи, не требует ожидания / уведомления, выходной FlowFile может быть подключен к нисходящему потоку.
0 голосов
/ 03 апреля 2019

Я думаю, что мое предложение по этому вопросу будет вписываться и в ваш сценарий

Как запустить процессор только тогда, когда другой процессор не выполняется?

Проверить этоиз

...