проблемы параллелизма с Python событий и блокировки - PullRequest
0 голосов
/ 29 ноября 2011

Моя критическая область - это объект полки, и я использую комбинацию события и блокировки, чтобы разделить потоки. Однако это не работает. Под этим я подразумеваю зависание процесса, у меня где-то происходит неконтролируемая блокировка, и после этого ничего не происходит.

Я действительно обнаружил проблему. Замок получен, а объект полки изменен. Затем, когда возникает условие, которое выполняет event.set (), все потоки уже находятся внутри блока кода после event.wait (). Другими словами, они запускают код в условиях, которые им не следует. Я собираюсь отойти от Event и в объекты Condition, я использовал Event, потому что изначально я использовал Queue. Хотя в очереди нет постоянства, поэтому сейчас я использую этот метод.

Это выглядит примерно так:

import SocketServer
SocketServer.TCPServer.allow_reuse_address = True 
from Dropbox import Dropbox
import threading
import json

class DropboxServer(SocketServer.StreamRequestHandler):
    def handle(self):
        inp = self.request.recv(8192).strip()
        Dropbox.enqueue(json.loads(inp), "DropboxServer")
        self.request.send(json.dumps({"status":"success"}))

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999
    event = threading.Event()
    event.clear()
    lock = threading.Lock()
    Dropbox.lock = lock
    Dropbox.event = event                                                                                                                                                                                                                   
    for i in range(Dropbox.thread_count):
            thread = Dropbox()
            thread.start()
    server = SocketServer.TCPServer((HOST, PORT), DropboxServer)
    print "serving forever"
    server.serve_forever()

и

from dropbox import session, client
import Queue
import random
import string
from time import time
import shelve

class Dropbox(threading.Thread):
    thread_count = 20
    APP_KEY = 'key'
    APP_SECRET = 'sec'
    queue = Queue.Queue()
    requests = shelve.open("requests.db")

    def __init__(self):
        self.sess = session.DropboxSession(self.APP_KEY, self.APP_SECRET, 'dropbox')
        self.sess.set_token("token", "secret")
        self.dropbox = client.DropboxClient(self.sess)
        super(Dropbox, self).__init__(group=None, target=None, name=None, args=(), kwargs={})
        Dropbox.requests["dropbox"] = []

    def run(self):
        while True:
            self.put_file(Dropbox.dequeue(self.name))

    def put_file(self, request):
        file = open(request["local_file_path"])
        self.dropbox.put_file(request["destination"], file)
        file.close()
        print(self.name + " ending. " + str(time() - Dropbox.start_time))

    """ 
    {
        destination : project_name/AssetFile.png
        local_file_path : /path/to/asset.png
        token: A9s8d*d8
        secret: a9s9*38
    }
    """
    @staticmethod
    def enqueue(request, name):
        if "destination" not in request:
            for single_request in request:
                size = Dropbox.enqueue(single_request, name)
            return size
        Dropbox.lock.acquire()
        print(name + " acquired lock")
        temp = Dropbox.requests["dropbox"] 
        temp.append(request)
        print(name + " enqueue size: " + str(len(temp)))
        Dropbox.requests["dropbox"] = temp
        Dropbox.lock.release()
        size = len(temp)
        if size == 1:
            Dropbox.start_time = time()
            print(name + " event set")
            Dropbox.event.set()
        print(name + " releasing lock")                                                                                                                                                                                                     
        return size

    @staticmethod
    def dequeue(name):
        Dropbox.lock.acquire()
        print(name + " acquired lock")
        temp = Dropbox.requests["dropbox"]
        try:
            result = temp.pop(0)
        except IndexError:
            Dropbox.event.wait()
        print(name + " dequeue size: " + str(len(temp)))
        Dropbox.requests["dropbox"] = temp
        Dropbox.lock.release()
        if len(temp)  == 0:
            print(name + " event cleared")
            Dropbox.event.clear()
        print(name + " releasing lock")
        return result
...