Я использую Flask API для потоковой передачи данных из ElasticSearch + LogStash. Я пытаюсь понять, является ли Redis Queue подходящей базой данных для использования в очереди данных для обработки данных по мере их поступления.
Вариант использования, на который я нацеливаюсь:
Flask API получает входные данные Json Events
от LogStash, с атрибутом Timestamp
, прикрепленным к каждому объекту Json.
По мере поступления сообщений Json они помещаются в Redis Queue
.
После сбора данных за 5 минут (примерно оцененных как 2,000
события json), фоновый рабочий процесс запускает модель обработки данных Batch
для доступных данных, инаправляет вывод в индекс ElasticSearch
.
По мере поступления новых данных предыдущие Json-события отбрасываются из очереди, новые события ставятся в очередь, и фоновый процесс продолжаетобработать данные, которые в данный момент находятся в очереди, и сохранить выходные данные.
Мне нужно больше ясности в шагах 4 и 5 этой архитектуры, при погодных условиях Redis Queue
- правильный компонентздесь.
Насколько я понимаю, Redis Queue dequeues
автоматически обрабатывает данные по мере их обработки, поэтому невозможно отложить обработку из очереди до тех пор, пока не будет получен необходимый объем данных.