Я хочу расширить ответ smartse и найти решение. Вы хотите поместить свой запрос API в определенные временные окна, смещая начальный и конечный параметры. Работа с окнами может работать следующим образом:
- Инициализировать параметры начальной, конечной отметки времени
- Поместить эти параметры в качестве атрибутов в поток
- Нижестоящие процессоры могут вызывать API, используя эти параметры
- После этого вы должны установить start = previous_end + 1 секунду и end = now
Когда вы определяете новое окно для следующего запуска, вам нужны параметры изпредыдущий прогон. Вот почему вы должны помнить эти ценности. Вы можете добиться этого, используя распределенный кэш карт NiFi.
Я собрал для вас поток:
![enter image description here](https://i.stack.imgur.com/hxRvB.png)
Увеличение Get next date range
:
![enter image description here](https://i.stack.imgur.com/8dKQx.png)
Конечный параметр всегда теперь, так что вам просто нужно сохранить начальный параметр. FetchDistributedMapCache
выберет это для вас и поместит в атрибут stored.state
:
![enter image description here](https://i.stack.imgur.com/t62Yv.png)
Set time range
процессор инициализирует параметры:
![enter image description here](https://i.stack.imgur.com/Q0SQS.png)
Обратите внимание, что конец всегда наступает, и начало - это либо начальная дата (для первого запуска), либо последний параметр окончания плюс 1 секунда. В этот момент поток направляется в вывод Time range
, где вы можете вызвать свой API в нисходящем направлении. Кроме того, вы должны обновить хранимое значение. Это происходит в ReplaceText
процессоре:
![enter image description here](https://i.stack.imgur.com/idPna.png)
Наконец вы обновляете состояние:
![enter image description here](https://i.stack.imgur.com/I30sL.png)
Жизненный цикл параметров привязан к идентификатору кэша. Когда вы меняете идентификатор, вы начинаете с нуля.