как я могу сбросить счетчик уведомлений ожидания в нифи - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть поток, в котором мне приходится каждый раз ждать точные 12 файлов.

, поэтому я написал код, подобный

getHDFS -> notify -> wait -> некоторая обработка.

уведомление имеет следующую конфигурацию

Release Signal Identifier = eredee_45rgfyWQQWQ
Signal Counter Name = cntr_for_run
Signal Counter Delta = 1
Signal Buffer Count = 1

ожидание имеет следующую конфигурацию

Release Signal Identifier = eredee_45rgfyWQQWQ
Target Signal Count = 12
Signal Counter Name = cntr_for_run
Wait Buffer Count = 1
Releasable FlowFile Count = 1

, это работает впервые, ион выпускает файлы только тогда, когда общее чтение составляет 12. но как только приходит 13-й файл, он просто проходит мимо.Я хочу, чтобы он работал таким образом в следующий раз, когда файл ожидания релизов, когда общее количество составляет 24 (я имею в виду следующие 12 файлов).Как я могу сбросить счетчик хода обратно на ноль?

Ответы [ 3 ]

0 голосов
/ 18 февраля 2019

Значение или результаты оператора языка выражений атрибутов, которые будут сравниваться с FlowFile для определения дельты счетчика сигналов.Укажите, насколько должен увеличиваться счетчик.Например, если множественные события сигнала обрабатываются в восходящем потоке ориентированным на пакет способом, количество обработанных событий может быть уведомлено с помощью этого свойства сразу.Ноль (0) имеет особое значение: он сбрасывает счетчик целей до 0, что особенно полезно при использовании с режимом ожидания Releasable FlowFile Count = Zero (0), чтобы обеспечить тип управления потоком «open-close-gate».Один (1) может открыть соответствующий процессор ожидания, а Zero (0) может отменить его, как если бы он закрывал гейт.

(Описание свойства Signal Counter Delta процессора Notify.) Таким образом, вы можете просто установить счетчик дельта на ноль, запустить его, и вот он у вас.

Кроме того, попробуйте использовать UpdateAttribute и обновлять счетчик каждые 12 потоковых файлов.

UpdateAttribute: начальное значение переменных с состоянием = 1 Состояние хранилища = Store State Locally count = ${getStateValue("count"):equals(13):ifElse('1', ${getStateValue("count"):plus(1)}}

=> RouteOnAttribute: verify = ${count:equals(13)}

Matched => Сброс с помощью Notify (описано выше)

Unmatched => продолжить как обычно

Я должен предупредить вас, что он будет работать только на одном узле(поскольку состояние UpdateAttribute хранится локально на узле)

0 голосов
/ 02 мая 2019

У меня была такая же проблема.И решить это: в свойстве Ждать процесс использования

Режим копирования атрибута = Заменить, если присутствует

И в моем случае это работает только при

Заменить при наличии = всего

0 голосов
/ 13 февраля 2019

Я не пробовал этого, но вы можете попробовать использовать процессор PutDistributedMapCache, чтобы установить пустое значение для ключа кэша идентификатора сигнала, который вы используете в Wait / Notify.

В качестве альтернативы, вы можете сгенерироватькаким-то образом использовать новый идентификатор сигнала выпуска для каждого пакета файлов и использовать язык выражений в процессорах Notify и Wait для ссылки на динамическое значение вместо жесткого кодирования идентификатора.

...