Почему я получаю истощение потока при использовании threading.Lock и 'with'?(Python 3.6.6) - PullRequest
0 голосов
/ 14 октября 2018

tl; dr

Использование threading.Lock и with приводит к истощению потока (один поток получает всю или большую часть работы):

with lock:
    next(generator)

Но вызывая acqu ()и release () вручную дает гораздо более равномерное распределение работы по потокам:

lock.acquire()
next(generator)
lock.release()

Полный код и примеры вывода

Синхронизация с использованием threading.Lock () и with

import threading
import queue
import time
import random

class ThreadedGen:
  def __init__(self, generator):
    self._generator = generator
    self._queue = queue.Queue(10)
    self._lock  = threading.Lock()

  def _run(self):
    while True:
      with self._lock: # <==== use with, no explicit call to acuire()/release()
        try:
          # get the next item from the generator
          # put the thread name and item as a tuple on the queue
          n = next(self._generator)
          self._queue.put((threading.current_thread().name, n))
        except Exception as e:
          print(threading.current_thread().name, e)

  def start(self):
    for _ in range(3): # spawn three threads
      threading.Thread(target=self._run).start()

  def get(self):
    while True:
      yield self._queue.get()

if __name__ == '__main__':
  def gen():
    n = 0
    while n < 100:
      s = random.randint(0,5)
      time.sleep(s) # simulate work
      yield n
      n += 1

  t = ThreadedGen(gen())
  t.start()
  outputgen = t.get()
  for _ in range(100):
    (tname, n) = next(outputgen)
    print(f'got {n} from {tname}')

Вывод:

got 0 from Thread-1
got 1 from Thread-1
got 2 from Thread-1
got 3 from Thread-1
got 4 from Thread-1
got 5 from Thread-1
got 6 from Thread-1
got 7 from Thread-1
got 8 from Thread-1
got 9 from Thread-1
got 10 from Thread-1
got 11 from Thread-1
got 12 from Thread-1
got 13 from Thread-1
got 14 from Thread-1
got 15 from Thread-1
got 16 from Thread-1
got 17 from Thread-1
got 18 from Thread-1
got 19 from Thread-1
got 20 from Thread-1
got 21 from Thread-1
got 22 from Thread-1
got 23 from Thread-1
got 24 from Thread-1
got 25 from Thread-1
got 26 from Thread-1
got 27 from Thread-1
got 28 from Thread-1
got 29 from Thread-1
got 30 from Thread-1
got 31 from Thread-1
got 32 from Thread-1
got 33 from Thread-1
got 34 from Thread-1
got 35 from Thread-1
got 36 from Thread-1
got 37 from Thread-1
got 38 from Thread-1
got 39 from Thread-1
got 40 from Thread-1
got 41 from Thread-1
got 42 from Thread-1
got 43 from Thread-1
got 44 from Thread-1
got 45 from Thread-1
got 46 from Thread-1
got 47 from Thread-3
got 48 from Thread-3
got 49 from Thread-3
got 50 from Thread-3
...

Синхронизация с использованием многопоточности. Блокировка () и явное получение () release ()

import threading
import queue
import time
import random

class ThreadedGen:
  def __init__(self, generator):
    self._generator = generator
    self._queue = queue.Queue(10)
    self._lock  = threading.Lock()

  def _run(self):
    while True:
      try:
        # get the next item from the generator
        # put the thread name and item as a tuple on the queue
        self._lock.acquire() # <==== explicit acquire() call
        n = next(self._generator)
        self._lock.release() # <==== explicit release() call
        self._queue.put((threading.current_thread().name, n))
      except Exception as e:
        self._lock.release()
        print(threading.current_thread().name, e)

  def start(self):
    for _ in range(3): # spawn three threads
      threading.Thread(target=self._run).start()

  def get(self):
    while True:
      yield self._queue.get()

if __name__ == '__main__':
  def gen():
    n = 0
    while n < 100:
      s = random.randint(0,5)
      time.sleep(s) # simulate work
      yield n
      n += 1

  t = ThreadedGen(gen())
  t.start()
  outputgen = t.get()
  for _ in range(100):
    (tname, n) = next(outputgen)
    print(f'got {n} from {tname}')

Вывод:

got 0 from Thread-1
got 1 from Thread-2
got 2 from Thread-3
got 3 from Thread-1
got 4 from Thread-2
got 5 from Thread-3
got 6 from Thread-1
got 7 from Thread-2
got 8 from Thread-3
got 9 from Thread-1
got 10 from Thread-1
got 11 from Thread-3
got 12 from Thread-2
got 13 from Thread-1
got 14 from Thread-3
got 15 from Thread-2
got 16 from Thread-1
got 17 from Thread-3
got 18 from Thread-2
got 19 from Thread-1
got 20 from Thread-3
got 21 from Thread-2
got 22 from Thread-1
got 23 from Thread-3
got 24 from Thread-2
got 25 from Thread-1
got 26 from Thread-3
got 27 from Thread-2
got 28 from Thread-1
got 29 from Thread-3
got 30 from Thread-2
got 31 from Thread-1
got 32 from Thread-3
got 33 from Thread-2
got 34 from Thread-2
got 35 from Thread-2
got 36 from Thread-3
got 37 from Thread-1
got 38 from Thread-2
got 39 from Thread-3
got 40 from Thread-1
got 41 from Thread-2
got 42 from Thread-2
got 43 from Thread-1
got 44 from Thread-3
got 45 from Thread-2
got 46 from Thread-1
got 47 from Thread-3
got 48 from Thread-2
got 49 from Thread-1
got 50 from Thread-1

1 Ответ

0 голосов
/ 14 октября 2018

Я понял это, когда набирал вопрос (спасибо ТАК!).Проблема в том, что я не совмещал with непосредственно перед вызовом next(self._generator)

Неправильно:

  with self._lock:
    try:
      # get the next item from the generator
      # put the thread name and item as a tuple on the queue
      n = next(self._generator)
      self._queue.put((threading.current_thread().name, n))
    except Exception as e:
      print(threading.current_thread().name, e)

Справа:

    try:
      # get the next item from the generator
      # put the thread name and item as a tuple on the queue
      with self._lock:
        n = next(self._generator)
      self._queue.put((threading.current_thread().name, n))
    except Exception as e:
      print(threading.current_thread().name, e)

queue.put() блокируется по умолчанию, если очередь заполнена.Это означает, что блокировка не будет немедленно снята, что помешает другим потокам получить доступ к генератору и фактически лишит их энергии, поскольку они стремятся получить блокировку от первого потока, прежде чем он сможет повторно ее захватить (таким образом, состояние гонки).

Резюме:

Поместите with как можно ближе к синхронизированному вызову, чтобы другие блокирующие вызовы не задерживали доступ других потоков к ресурсу

...