Примечание. Это минимальный воспроизводимый пример реальной ситуации
Последовательность шагов:
1. Я добавляю задачи в очередь сообщений. задачи, например
("analysis_id": 10, "cell": "A")
("analysis_id": 10, "cell" : "B")
("analysis_id": 10, "cell": "C")
("analysis_id": 11, "cell": "A")
("analysis_id": 11, "cell": "B")
Как видите, каждый analysis_id может иметь несколько ячеек.
2. У меня есть потребитель, который потребляет «каждую» задачу и обрабатывает ее
например python3.7 run_my_tasks.py -analysis_id <analysis_id> -cell <cell>
3. Мой потребитель должен быть всегда живым
Это означает, что обратный вызов потребителя должен всегда ждать новых сообщений.
4. Мне нужно иметь возможность сгруппировать подобные ячейки analysis_id в один список и обработать их вместе
Итак, я хочу добиться:
list_to_send = [A, B , C]
list_to_send = [A, B]
5. Делая это, если я добавляю в старый список, я могу добавить. И если мне нужно создать новый список, я могу отправить старый список, list_to_send = []
, а затем начать добавление еще раз
Вот то, что я придумал, но я не совсем уверен, что смогу заставить это работать. Я прокомментировал в коде, чтобы показать мои логи c.
from time import sleep
"""
Initially analysis_id and list_to_send are both empty.
In the actual code, my consumer will get the json from the message queue, and the json contains both analysis_id and cell.
In pseudo code:
If it's a new analysis_id (not same as the previous)
Send the previous list for processing
Create new_list
Append to new_list
If it's the same as previous analysis_id
append to list
And so on...
"""
analysis_id = "TGSA"
list_to_send = []
# This is my message queue. Think of TGSA as analysis_id: 10, and TGSB as analysis_id: 11, and TGSC as analysis_id: 12
rab_q = ["TGSA", "TGSA", "TGSA", "TGSA", "TGSB",
"TGSB", "TGSC", "TGSC", "TGSC", "TGSC", "TGSD", "TGSD", "TGSD", "TGSD", "TGSD", "TGSD"]
def my_func(_id):
# Use the gloabl variables
global analysis_id
global list_to_send
# if list_to_send is empty (which initially it should be)
if not list_to_send:
# append the analysis_id "TGS*" to my list to send
if _id == analysis_id:
# First is equal, so append to list.
list_to_send.append(_id)
elif _id != analysis_id:
# If the list is empty, and the analysis_id are different
# change analysis_id and append to list
analysis_id = _id
list_to_send.append(analysis_id)
# if list_to_send is NOT empty
else:
# If the analysis_id is equal, append
if _id == analysis_id:
list_to_send.append(_id)
# If analysis_id is not equal, and list is NOT empty, that means this is a new run
elif _id != analysis_id:
# print(list_to_send) is basically where I will send_tasks(list_to_send)
print(list_to_send)
# list_to_send has been sent for processing, reinstantiate an empty list
list_to_send = []
# and append the current analysis_id to the new list
analysis_id = _id
list_to_send.append(analysis_id)
# Add each message to my `callback` or `consumer`
for i in rab_q:
my_func(i)
По сути, я пытаюсь никогда не выходить из потребителя / обратного вызова и, тем не менее, группировать аналогичные анализ_id вместе, отправлять список analysis_id для обработки, а затем снова пустой список, чтобы быть готовым к следующему уровню analysis_ids.
Я получил его на работу, НО, он пропускает последний анализ analysis_id. Запустите код, чтобы узнать больше.