потоки Python и общие переменные - PullRequest
0 голосов
/ 06 ноября 2010

как я могу обновить общую переменную между различными потоками. Поток в python?

Допустим, у меня есть 5 потоков, работающих над Queue.Queue ().После завершения очереди я хочу выполнить другую операцию, но я хочу, чтобы это произошло только один раз.

Можно ли совместно использовать и обновлять переменную между потоками.поэтому, когда Queue.empty () имеет значение True, это событие запускается, но если один из потоков делает это, я не хочу, чтобы другие делали это тоже, потому что я получил бы неправильные результаты.

РЕДАКТИРОВАТЬ
У меня есть очередь, которая отражает файлы в файловой системе.файлы загружаются на сайт потоками, и в то время как каждый поток загружает файл, он обновляет набор () ключевых слов, которые я получил из файлов.
когда очередь пуста, мне нужно связаться с сайтом и сказать ему:обновить количество ключевых слов.Прямо сейчас каждый поток делает это, и я получаю обновление для каждого потока, что плохо.я также пытался очистить набор, но он не работает.

keywordset = set()
    hkeywordset = set()
    def worker():
        while queue:
            if queue.empty():
                if len(keywordset) or len(hkeywordset):
                    # as soon as the queue is empty we send the keywords and hkeywords to the
                    # imageapp so it can start updating 
                    apiurl   = update_cols_url
                    if apiurl[-1] != '/':
                        apiurl = apiurl+'/'
                    try:
                        keywords = []
                        data = dict(keywords=list(keywordset), hkeywords=list(hkeywordset))
                        post = dict(data=simplejson.dumps(data))
                        post = urllib.urlencode(post)
                        urllib2.urlopen(apiurl, post)
                        hkeywordset.clear()
                        keywordset.clear()
                        print 'sent keywords and hkeywords to imageapp...'
                    except Exception, e: print e
            # we get the task form the Queue and process the file based on the action
            task = queue.get()
            print str(task)
            try:
                reindex = task['reindex']
            except:
                reindex = False
            data = updater.process_file(task['filename'], task['action'], task['fnamechange'], reindex)
            # we parse the images keywords and hkeywords and add them to the sets above for later 
            # processing
            try:
                for keyword in data['keywords']:
                    keywordset.add(keyword)
            except: pass
            try:
                for hkw in data['hkeywords']:
                        hkeywordset.add(hkw)
            except:pass
            queue.task_done()


    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        t.daemon = True
        t.start()

    while 1:
        line = raw_input('type \'q\' to stop filewatcher... or \'qq\' to force quit...\n').strip()

это то, что я пытался в принципе.но, конечно, часть queue.empty () исполняется столько раз, сколько у меня потоков.

Ответы [ 3 ]

0 голосов
/ 06 ноября 2010

Почему вы не можете просто добавить последний шаг в очередь?

0 голосов
/ 06 ноября 2010

Иметь другую очередь, в которую вы помещаете это событие после того, как первая очередь пуста.
Или иметь специальный поток для этого события.

0 голосов
/ 06 ноября 2010

Если вы используете очередь для запуска потока ( пул потоков ), то вы убедитесь, что не будет условия гонки (потокобезопасен), потому что очередь запускает ваш поток последовательно.поэтому я думаю, что вы можете разделить переменную между потоками, и вы можете быть уверены, что не будет условия гонки для этой переменной.

Edit : Вот что-то похожее в том, что выНадеюсь, что на этот раз вы ответите на свой вопрос :):

import Queue
import threading
import ftplib
import os


class SendFileThread(threading.Thread):
     """ Thread that will handle sending files to the FTP server"""

     # Make set of keywords a class variable.
     Keywords = set()

     def __init__(self, queue, conn):

          self.conn = conn   
          self.queue = queue

          threading.Thread.__init__(self)

      def run(self):
          while True:
              # Grabs file from queue.
              file_name = self.queue.get()

              # Send file to FTP server.
              f=open(file_name,'rb')
              self.conn.storbinary('STOR '+os.path.basename(file_name),f)

              # Suppose that this keywords are in the first line.
              # Update the set of keywords.
              SendFileThread.Keywords.update(f.readline().split(" ")))

              # Signals to queue job is done.
              self.queue.task_done()


def main():
     # Files to send.
     files = os.listdir('/tosend')

     queue = Queue.Queue()

     # Connect to the FTP server.
     conn = ftplib.FTP('ftp_uri')   
     conn.login()

     # Create 5 threads that will handle file to send.
     for i in range(5):
         t = SendFileThread(queue, conn)
         t.start()

     # Fill the queue with files to be send.   
     for file in files:
         queue.put(file)

     # Wait until or thread are finish
     queue.join()

     # Send the keywords to the FTP server.
     # I didn't understand well the part update keywords count, 
     # how this count is stored ...
     # Here i will just send the keywords to the FTP server.
     with open("keywords", "w") as keywords_file
         keywords_file.write(";".join(SendFileThread.Keywords))
         conn.storbinary('STOR '+os.path.basename("keywords"),
                          keywords_file)

     conn.close()


if __name__ == '__main__':
     main()
...