Всегда потреблять из очереди, группируя похожие предметы - PullRequest
0 голосов
/ 25 февраля 2020

Примечание. Это минимальный воспроизводимый пример реальной ситуации

Последовательность шагов:

1. Я добавляю задачи в очередь сообщений. задачи, например

("analysis_id": 10, "cell": "A")

("analysis_id": 10, "cell" : "B")

("analysis_id": 10, "cell": "C")

("analysis_id": 11, "cell": "A")

("analysis_id": 11, "cell": "B")

Как видите, каждый analysis_id может иметь несколько ячеек.

2. У меня есть потребитель, который потребляет «каждую» задачу и обрабатывает ее

например python3.7 run_my_tasks.py -analysis_id <analysis_id> -cell <cell>

3. Мой потребитель должен быть всегда живым

Это означает, что обратный вызов потребителя должен всегда ждать новых сообщений.

4. Мне нужно иметь возможность сгруппировать подобные ячейки analysis_id в один список и обработать их вместе

Итак, я хочу добиться:

list_to_send = [A, B , C]

list_to_send = [A, B]

5. Делая это, если я добавляю в старый список, я могу добавить. И если мне нужно создать новый список, я могу отправить старый список, list_to_send = [], а затем начать добавление еще раз

Вот то, что я придумал, но я не совсем уверен, что смогу заставить это работать. Я прокомментировал в коде, чтобы показать мои логи c.

from time import sleep

"""
Initially analysis_id and list_to_send are both empty. 
In the actual code, my consumer will get the json from the message queue, and the json contains both analysis_id and cell.

In pseudo code:

If it's a new analysis_id (not same as the previous)
    Send the previous list for processing
    Create new_list
    Append to new_list
If it's the same as previous analysis_id
    append to list
And so on...
"""


analysis_id = "TGSA"
list_to_send = []

# This is my message queue. Think of TGSA as analysis_id: 10, and TGSB as analysis_id: 11, and TGSC as analysis_id: 12
rab_q = ["TGSA", "TGSA", "TGSA", "TGSA", "TGSB",
         "TGSB", "TGSC", "TGSC", "TGSC", "TGSC", "TGSD", "TGSD", "TGSD", "TGSD", "TGSD", "TGSD"]


def my_func(_id):
    # Use the gloabl variables
    global analysis_id
    global list_to_send

    # if list_to_send is empty (which initially it should be)
    if not list_to_send:
        # append the analysis_id "TGS*" to my list to send
        if _id == analysis_id:
            # First is equal, so append to list.
            list_to_send.append(_id)
        elif _id != analysis_id:
            # If the list is empty, and the analysis_id are different
            # change analysis_id and append to list
            analysis_id = _id
            list_to_send.append(analysis_id)
    # if list_to_send is NOT empty
    else:
        # If the analysis_id is equal, append
        if _id == analysis_id:
            list_to_send.append(_id)
        # If analysis_id is not equal, and list is NOT empty, that means this is a new run
        elif _id != analysis_id:
            # print(list_to_send) is basically where I will send_tasks(list_to_send)
            print(list_to_send)
            # list_to_send has been sent for processing, reinstantiate an empty list
            list_to_send = []
            # and append the current analysis_id to the new list
            analysis_id = _id
            list_to_send.append(analysis_id)


# Add each message to my `callback` or `consumer`
for i in rab_q:
    my_func(i)


По сути, я пытаюсь никогда не выходить из потребителя / обратного вызова и, тем не менее, группировать аналогичные анализ_id вместе, отправлять список analysis_id для обработки, а затем снова пустой список, чтобы быть готовым к следующему уровню analysis_ids.

Я получил его на работу, НО, он пропускает последний анализ analysis_id. Запустите код, чтобы узнать больше.

1 Ответ

0 голосов
/ 25 февраля 2020

В случае, если у кого-то появится этот уникальный вариант использования в будущем, вот как мне удалось его решить.

Если у кого-то есть более эффективный способ решения этой проблемы, я очень хочу увидеть ваше решение.

from time import sleep

analysis_id = "TGSA"
list_to_send = []
last_message = False
rab_q = ["TGSA", "TGSA", "TGSA", "TGSA", "TGSB",
         "TGSB", "TGSC", "TGSC", "TGSC", "TGSC",
         "TGSD", "TGSD", "TGSD", "TGSD", "TGSD", "TGSD"]


def my_func(_id):

    global analysis_id
    global list_to_send
    global last_message
    global count

    if not list_to_send:

        if _id == analysis_id:

            list_to_send.append(_id)
        elif _id != analysis_id:

            analysis_id = _id
            list_to_send.append(analysis_id)

    else:

        if _id == analysis_id and not last_message:
            list_to_send.append(_id)

        elif _id != analysis_id:
            print(f'send this for processong: {list_to_send}')
            list_to_send = []
            analysis_id = _id
            list_to_send.append(analysis_id)
        elif _id == analysis_id and last_message:
            print(f'send this for processong: {list_to_send}')
            list_to_send = []
            analysis_id = ""
            count = 0
            last_message = False


# Add each message to my `callback` or `consumer`
total_cur_jobs = len(rab_q)
count = 0

for i in rab_q:
    count += 1
    if count == total_cur_jobs:
        last_message = True
    my_func(i)

print(
    f"now that all the processing has ended lets see what we have \n analysis_id: {analysis_id}, list_to_send: {list_to_send}, last_message: {last_message} and finally, count: {count}")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...