В настоящее время я программирую сервер дейтаграмм на основе python, используя потоки и все такое.
Я столкнулся со следующей проблемой: я использую несколько потоков распределения для распределения входящих пакетов по различным потокам обработки. Внутри потоков обработки я использую threading.local () для отслеживания локальных переменных потока.
В настоящее время я тестирую, как мой сервер реагирует на высокую нагрузку (2000 пакетов за ~ 2 секунды), и натолкнулся на странное поведение local () - Object.
Кажется, какое-то время все работает нормально, а затем в какой-то момент выдает исключение:
Exception in thread 192.168.1.102: # <-- This is the Processing Thread
Traceback (most recent call last):
File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'
# The following three lines are debug
# This is the Allocation thread that has called the Processing thread
<Thread(192.168.1.102, started 1106023568)>
# This confirms that the queue it tries to access exists
<Queue.Queue instance at 0x40662b48>
# This is the Processing thread, which has stopped executing on exception
<Thread(192.168.1.102, stopped 1106023568)>
Exception in thread pyAlloc-0: # <-- This is the Allocation thread
Traceback (most recent call last):
File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
foundThread.addTask(str(data))
File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'
часть обрабатывающей нити
# Imports
import threading
import time
import Queue
class Thread(threading.Thread):
def __init__(self,pThread):
threading.Thread.__init__(self)
self.loc = threading.local()
self.loc.todo = Queue.Queue()
self.loc.father = pThread
print "Konstruktor ausgefuehrt"
print self.loc.todo
def run(self):
self.loc.run = True;
print self.loc.father
while (self.loc.run):
try:
task = self.loc.todo.get(True, 1)
print "processing..."
if(task == self):
self.loc.run=False
else:
print task
except Queue.Empty:
pass
self.loc.father.threadTerminated(self)
print self.name, "terminating..."
def addTask(self, pTask):
print self
print self.loc.todo
self.loc.todo.put(pTask)
И поток выделения:
import threading
import pyProcThread # My processing Thread
import Queue
import time
class Thread(threading.Thread):
# Lock-Objects
threadListLock = threading.Lock()
waitListLock = threading.Lock()
alive = True;
taskQueue = Queue.Queue()
# Lists
# List of all running threads
threads = []
def threadExists(self,pIP):
"""Checks if there is already a thread with the given Name"""
for x in self.threads:
if x.name == pIP:
return x
return None
def threadTerminated(self,pThread):
"""Called when a Processing Thread terminates"""
with self.threadListLock:
self.threads.remove(pThread)
print "Thread removed"
def threadRegistered(self,pThread):
"""Registers a new Thread"""
self.threads.append(pThread)
def killThread(self):
self.alive = False
def run(self):
while(self.alive):
# print "Verarbeite Nachricht ", self.Message
# print "Von ", self.IP
try:
data, addtemp = self.taskQueue.get(True, 1)
addr, _junk = addtemp
with self.threadListLock:
foundThread=self.threadExists(str(addr))
# print "Thread " + self.name + " verarbeitet " + data
if (foundThread!=None):
#print "recycling thread"
foundThread.addTask(str(data))
else:
print "running new Thread"
t = pyProcThread.Thread(self)
t.name = str(addr)
t.addTask(str(data))
t.start()
self.threadRegistered(t)
self.taskQueue.task_done()
except Queue.Empty:
pass
print self.name, "terminating..."
with self.threadListLock:
for thread in self.threads:
thread.addTask(thread)
Полный вывод, включая все отладочные отпечатки:
running new Thread
Konstruktor ausgefuehrt
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, initial)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
Exception in thread 192.168.1.102:
Traceback (most recent call last):
File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 37, in run
print self.loc.father
AttributeError: 'thread._local' object has no attribute 'father'
<Thread(192.168.1.102, started 1106023568)>
<Queue.Queue instance at 0x40662b48>
<Thread(192.168.1.102, stopped 1106023568)>
Exception in thread pyAlloc-0:
Traceback (most recent call last):
File "/opt/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/volume1/home/Max/Python/MyThread/pyAllocThread.py", line 60, in run
foundThread.addTask(str(data))
File "/volume1/home/Max/Python/MyThread/pyProcThread.py", line 58, in addTask
print self.loc.todo
AttributeError: 'thread._local' object has no attribute 'todo'
Terminating main
192.168.1.102
DEINEMUDDA sent to 192.168.1.102 : 8082
pyAlloc-1 terminating...
<Thread(192.168.1.102, stopped 1106023568)>
<Queue.Queue instance at 0x40662b48>
Как видно из журнала, какое-то время все работает нормально, хотя оно никогда не достигает значения print "processing
в основной функции потока обработки.
Я искал в Интернете и StackOverflow похожие проблемы, но не смог их найти. Заранее благодарен за любую помощь и извините за мой стиль кодирования. Я программирую на Python всего несколько дней и до сих пор изучаю веревки с некоторыми его функциями.
РЕДАКТИРОВАТЬ: Конечно, это больше, чем этот сервер. Существует основной поток, который получает пакеты и отправляет их в поток распределения, а также множество других вещей в фоновом режиме. Кроме того, Сервер еще далек от завершения, но я хочу, чтобы прием работал до того, как я начну заниматься другими делами.