Почему это значение массива удаляется, но все еще печатается? - PullRequest
1 голос
/ 04 апреля 2020

Этот сценарий является очень простой версией большей, чем у меня есть, и я делаю здесь, чтобы удалить «pip», если есть значение массива, подобное этому. Но проблема возникает, когда приходит второй цикл (или даже больше), пип продолжает печатать на экране, и я не хочу этого делать.

import requests, threading, random, string, json, time, queue, re

num_worker_threads = int(input("Threads: "))

lista = ['asd', 'asdjk', 'pip', 'lasd', 'lol']
print(str(lista))

def do_work(i):
    try:
        print(i.strip())
        print(str(lista))
        if i == "pip":
            lista.remove("pip")
    except Exception as e:
        print(e)


def worker():
    while True:
        item = q.get()
        if item is not None:
            do_work(item)
            q.task_done()


q = queue.Queue()

threads = []

for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for i in range(2): # I have only put 2 cycles
    for line in lista:
        q.put(line)


q.join()

for i in range(num_worker_threads):
    q.put(None)

for t in threads:
    t.join()

Ответы [ 2 ]

1 голос
/ 04 апреля 2020

Вам нужен некоторый solid контроль параллелизма в вашем многопоточном коде, вам нужно убедиться, что несколько вещей происходит по порядку:

  • Вы хотите убедиться, что очередь не читает список снова, пока поток не очистит pip от него из первого раунда, вставленного в очередь.

  • Необходимо убедиться, что потоки не изменяют одни и те же элементы в в то же время, что приведет к тому, что один из них выдаст исключение, что он не может удалить удаленный элемент.

Вы можете использовать Event, чтобы наложить некоторый контроль на поток мульти многопоточная программа, давайте объявим событие с именем first_iteration_processsed, очередь будет ждать, пока это событие будет выполнено, и начнет вторую итерацию всех элементов списка. Событие будет установлено одним из ваших потоков, как только он удалит пункт из списка.

пример кода:

import requests, threading, random, string, json, time, queue, re
from threading import Event
num_worker_threads = int(input("Threads: "))

lista = ['asd', 'asdjk', 'pip', 'lasd', 'lol']
print(str(lista))

iteration_processsed = Event()

iteration_processsed.set()
def do_work(i):
    # global lista
    try:
        print(i.strip())
        print(str(lista))
        if i == "pip":
            lista.remove("pip")
            print("Removed pip successfully")
        if i == "iter_end":
            iteration_processsed.set()
    except Exception as e:
        print(e)


def worker():
    while True:
        item = q.get()
        if item is not None:
            do_work(item)
            q.task_done()


q = queue.Queue()

threads = []

for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for i in range(3): # I have only put 2 cycles
    iteration_processsed.wait()
    print(f"Iteration {i} started")
    iteration_processsed.clear()

    for line in lista:
        q.put(line)
    q.put('iter_end')

q.join()

for i in range(num_worker_threads):
    q.put(None)

for t in threads:
    t.join()

Давайте попробуем это:

Threads: 2
['asd', 'asdjk', 'pip', 'lasd', 'lol']
Iteration 0 started
asd
['asd', 'asdjk', 'pip', 'lasd', 'lol']
asdjk
['asd', 'asdjk', 'pip', 'lasd', 'lol']
pip
['asd', 'asdjk', 'pip', 'lasd', 'lol']
Removed pip successfully
lasd
['asd', 'asdjk', 'lasd', 'lol']
lol
iter_end
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
Iteration 1 started
asd
asdjk
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
lasd
lol
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
iter_end
['asd', 'asdjk', 'lasd', 'lol']
Iteration 2 started
asd
asdjk
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
lasd
lol
['asd', 'asdjk', 'lasd', 'lol']
['asd', 'asdjk', 'lasd', 'lol']
iter_end
['asd', 'asdjk', 'lasd', 'lol']

Теперь, как вы можете видеть, вторая итерация никогда не начнется до того, как будет удален pip, конечно, здесь реализуемый объект очень сильно зависит от c, но я думаю, что вы можете настроить его для своих более общих целей и, возможно, добавить больше события для блокировки большего количества операций, которые будут выполнены в некотором предопределенном порядке. Вы можете прочитать больше о событиях из документации, или эта статья тоже хорошее начало https://www.bogotobogo.com/python/Multithread/python_multithreading_Event_Objects_between_Threads.php

0 голосов
/ 04 апреля 2020

Ваш код имеет состояние гонки. Гонка между:

  • Основным потоком, который попытается добавить "pip" в очередь дважды, один раз на каждой из его итераций в течение lista.
  • Рабочие потоки, которые коллективно выбирают элементы из очереди и обрабатывают их. Если они обрабатывают запись "pip", они удаляют ее из lista.

Если первая запись "pip" обрабатывается рабочим потоком и удаляется из lista до второй "pip" был итерирован и добавлен, тогда он не будет обрабатываться снова. Однако, исходя из вашего описания проблемы, я предполагаю, что благодаря некоторой комбинации GIL и синхронизации различных частей кода вы довольно последовательно добавляете обе копии "pip" в очередь, прежде чем рабочие смогут это сделать. что-нибудь об этом. Но это не строго гарантировано. Если вы добавили задержку между итерациями lista в основном потоке, это, вероятно, дало бы рабочим потокам время, чтобы догнать и удалить "pip", как вы ожидаете:

for i in range(2):
    for line in lista:
        q.put(line)
    time.sleep(1) # delay for a second

Очевидно, это вероятно, это не то, что вы действительно хотите сделать, поскольку это все еще гонка, только та, которая вероятно будет выиграна другими гонщиками сейчас (нет никакой гарантии, что другие потоки также не будут чрезмерно долго ). Лучшим решением было бы спроектировать код так, чтобы вообще не было условий гонки.

Одна идея могла бы состоять в join очереди между итерациями списка, чтобы все значения обрабатывались рабочие перед вами go, чтобы снова добавить их. Это очень похоже на подход time.sleep, но он более надежен, поскольку основной поток будет ждать столько времени, сколько потребуется рабочим, чтобы обработать элементы в очереди, прежде чем добавлять что-либо еще.

Другая идея возможно, нужно выполнить фильтрацию "pip" записей в основном потоке, прежде чем значения будут помещены в очередь. Это может сделать только проверка if line != "pip"!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...