Python: объект thread._local не имеет атрибута todo - PullRequest
3 голосов
/ 25 февраля 2012

В настоящее время я программирую сервер дейтаграмм на основе python, используя потоки и все такое.

Я столкнулся со следующей проблемой: я использую несколько потоков распределения для распределения входящих пакетов по различным потокам обработки. Внутри потоков обработки я использую threading.local () для отслеживания локальных переменных потока.

В настоящее время я тестирую, как мой сервер реагирует на высокую нагрузку (2000 пакетов за ~ 2 секунды), и натолкнулся на странное поведение local () - Object.

Кажется, какое-то время все работает нормально, а затем в какой-то момент выдает исключение:

Exception in thread 192.168.1.102: # <-- This is the Processing Thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

# The following three lines are debug
# This is the Allocation thread that has called the Processing thread
<Thread(192.168.1.102, started 1106023568)> 
# This confirms that the queue it tries to access exists
<Queue.Queue instance at 0x40662b48>
# This is the Processing thread, which has stopped executing on exception
<Thread(192.168.1.102, stopped 1106023568)> 

Exception in thread pyAlloc-0: # <-- This is the Allocation thread
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

часть обрабатывающей нити

# Imports
import threading
import time
import Queue

class Thread(threading.Thread):
    def __init__(self,pThread):
        threading.Thread.__init__(self)
        self.loc = threading.local()
        self.loc.todo = Queue.Queue()
        self.loc.father = pThread
        print "Konstruktor ausgefuehrt"
        print self.loc.todo

    def run(self):
        self.loc.run = True;
        print self.loc.father
        while (self.loc.run):
        try:
            task = self.loc.todo.get(True, 1)
            print "processing..."
            if(task == self):
                self.loc.run=False
            else:
                print task
        except Queue.Empty:
            pass
        self.loc.father.threadTerminated(self)
        print self.name, "terminating..."

    def addTask(self, pTask):
        print self
        print self.loc.todo
        self.loc.todo.put(pTask)

И поток выделения:

import threading
import pyProcThread # My processing Thread
import Queue
import time
class Thread(threading.Thread):
    # Lock-Objects
    threadListLock = threading.Lock()
    waitListLock = threading.Lock()

    alive = True;

    taskQueue = Queue.Queue()
    # Lists
    # List of all running threads
    threads = []

    def threadExists(self,pIP):
        """Checks if there is already a thread with the given Name"""
        for x in self.threads:
            if x.name == pIP:
                return x
        return None

    def threadTerminated(self,pThread):
        """Called when a Processing Thread terminates"""
        with self.threadListLock:
            self.threads.remove(pThread)
            print "Thread removed"

    def threadRegistered(self,pThread):
        """Registers a new Thread"""
        self.threads.append(pThread)

    def killThread(self):
        self.alive = False

    def run(self):
        while(self.alive):
            # print "Verarbeite Nachricht ", self.Message
            # print "Von ", self.IP
            try:
                data, addtemp = self.taskQueue.get(True, 1)
                addr, _junk = addtemp
                with self.threadListLock:
                    foundThread=self.threadExists(str(addr))
                    # print "Thread " + self.name + " verarbeitet " + data
                    if (foundThread!=None):
                        #print "recycling thread"
                        foundThread.addTask(str(data))
                    else:
                        print "running new Thread"
                        t = pyProcThread.Thread(self)
                        t.name = str(addr)
                        t.addTask(str(data))
                        t.start()
                        self.threadRegistered(t)
                self.taskQueue.task_done()
            except Queue.Empty:
                pass
        print self.name, "terminating..."
        with self.threadListLock:
            for thread in self.threads:
                thread.addTask(thread)

Полный вывод, включая все отладочные отпечатки:

running new Thread
Konstruktor ausgefuehrt
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, initial)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
Exception in thread 192.168.1.102:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
    print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'

<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, stopped 1106023568)>
Exception in thread pyAlloc-0:
Traceback (most recent call last):
  File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
    foundThread.addTask(str(data))
  File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
    print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'

Terminating main
192.168.1.102
DEINEMUDDA  sent to  192.168.1.102 : 8082
pyAlloc-1 terminating...
<Thread(192.168.1.102, stopped 1106023568)>
<Queue.Queue instance at 0x40662b48>

Как видно из журнала, какое-то время все работает нормально, хотя оно никогда не достигает значения print "processing в основной функции потока обработки.

Я искал в Интернете и StackOverflow похожие проблемы, но не смог их найти. Заранее благодарен за любую помощь и извините за мой стиль кодирования. Я программирую на Python всего несколько дней и до сих пор изучаю веревки с некоторыми его функциями.

РЕДАКТИРОВАТЬ: Конечно, это больше, чем этот сервер. Существует основной поток, который получает пакеты и отправляет их в поток распределения, а также множество других вещей в фоновом режиме. Кроме того, Сервер еще далек от завершения, но я хочу, чтобы прием работал до того, как я начну заниматься другими делами.

1 Ответ

5 голосов
/ 25 февраля 2012

threading.local - это класс, представляющий локальные данные потока. Локальные данные потока - это данные, значения которых зависят от потока и, следовательно, доступны только для потока local (потока, создавшего объект local()). Другими словами, только поток, который устанавливает значение, видит значение.

Поскольку вы задали значение loc.parent в методе __init__ вашего потока обработки, но выполняемого внутри потока распределителя, локальные потоки обрабатывающих потоков будут доступны только в потоке распределителя. Поэтому он не будет работать в потоке обработки run(), так как он будет выполняться другим потоком (не распределителем).

...