Gevent / Threading вызывает некоторый тупик - PullRequest
0 голосов
/ 29 октября 2019

У меня есть этот код, цель которого - дедупликация запросов.

def dedup_requests(f):
    pending = {}

    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        key = _make_call_key(args, kwargs)
        if key not in pending:
            pending[key] = gevent.spawn(f, *args, **kwargs)
        result = pending[key].get()
        if key in pending:
            del pending[key]
        return result

    return wrapped

Я подозреваю, что это как-то вызывает тупик (это случается один раз, и я не могу его воспроизвести).

Это происходит как при использовании многопоточности, так и при использовании gevent.

Разрешено ли повторное использование get?

Может ли этот код даже вызвать взаимоблокировку, когда многопоточность не задействована?

Обратите внимание, что он работает под другими задачами gevent, поэтому порожденные задачи могут порождать дополнительные задачи, в случае, если это проблема.

Ответы [ 2 ]

0 голосов
/ 31 октября 2019

Хотя я до сих пор не совсем понимаю источник тупика (мое лучшее предположение, что get на самом деле не работает должным образом при вызове более одного раза), похоже, это работает:

from gevent import lock

def queue_identical_calls(f, max_size=100):
    pending = {}

    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        key = _make_call_key(args, kwargs)
        if key not in pending:
            pending[key] = lock.BoundedSemaphore(1)
        lock_for_current_call = pending[key]
        lock_for_current_call.acquire()
        result = f(*args, **kwargs)
        lock_for_current_call.release()

        if len(pending) > max_size:
            pending.clear()

        return result

    return wrapped
0 голосов
/ 30 октября 2019

Ваша проблема в том, что ваш код не асинхронный. Вам нужно, чтобы сама функция обрабатывала обновление ключа, а затем выполняла проверку цикла для ваших значений. Это пример асинхронной работы. Вы можете доказать это, заметив, что последний элемент иногда отображается первым в списке.

import gevent
import random

pending = {}
def dedup_requests(key, *args, **kwargs):
    global pending
    if key not in pending:
        gevent.spawn(ftest, key, *args, **kwargs)

def ftest(key, *args, **kwargs):
    global pending
    z = random.randint(1,7)
    gevent.sleep(z)
    pending[key] = z
    return z

l = ['test','test2','test3']
for i in l:
    dedup_requests(i)

while 1:
    if set(pending.keys()) != set(l):
        print(pending)
    else:
        print(pending)
        break
    gevent.sleep(1)
...