Я не знаком с node.js. Но для Redis я бы попробовал это:
Допустим, у вас есть client_1, client_2, все они публикуют события. У вас есть три машины: consumer_1, consumer_2, consumer_3.
- Establi sh список задач в redis, например, JOB_LIST.
- Clients put (LPU SH) jobs в этот JOB_LIST в специальной форме c, например «CLIENT_1: [jobcontent]», «CLIENT_2: [jobcontent]»
- Каждый потребитель блокирует задания (команда RPOP Redis) и обрабатывает их. Например, потребитель_1 берет задание, контент - CLIENT_1: [jobcontent]. Он анализирует контент и распознает его от CLIENT_1. Затем он хочет проверить, обрабатывает ли уже какой-либо другой потребитель CLIENT_1, в противном случае он заблокирует ключ, чтобы указать, что он обрабатывает CLIENT_1.
Далее он устанавливает ключ «CLIENT_1_PROCESSING», с содержимым как "consumer_1", с помощью команды Redis SETNX (устанавливается, если ключ не существует) с соответствующим таймаутом. Например, задача обычно занимает одну минуту до завершения sh, вы устанавливаете тайм-аут ключа в пять минут, на случай, если потребитель_1 выйдет из строя и будет удерживать блокировку бесконечно.
Если SETNX вернет 0, это означает, что ему не удалось получить блокировку CLIENT_1 (кто-то уже обрабатывает задание client_1). Затем он возвращает задание (значение «CLIENT_1: [jobcontent]») в левую часть JOB_LIST с помощью команды Redis LPU SH. Затем он может немного подождать (несколько секунд засыпать) и выполнить другую задачу RPOP с правой стороны СПИСКА. Если на этот раз SETNX возвращает 1, потребитель_1 получает блокировку. Он переходит к обработке задания, после завершения удаляет ключ «CLIENT_1_PROCESSING», снимая блокировку. Затем выполняется RPOP для другого задания и т.д.
Блокирующая часть немного рудиментарна, но ее будет достаточно.
---------- обновить --------------
Я придумал другой способ упорядочить задачи.
Для каждого клиента (производителя) создайте список. Подобно "client_1_list", поместите sh заданий в левую часть списка. Сохраните все имена клиентов в списке client_names_list со значениями client_1, client_2, et c.
Для каждого потребителя (процессора) повторите итерацию «client_names_list», например, consumer_1 получите "client_1", проверьте, заблокирован ли ключ client_1 (кто-то уже обрабатывает задачу client_1), если нет, вытащите значение (задание) из client_1_list и заблокируйте client_1. Если client_1 заблокирован (вероятно, спит одну секунду) и перейти к следующему клиенту, например, «client_2», проверить ключи и т. Д.
Таким образом, каждый клиент (производитель задачи) задача обрабатывается в порядке их поступления.