Я создал простой код с использованием многопоточности в python с очередью. У меня есть основной поток, который продолжает добавлять данные в очередь (размер очереди составляет 2000), и будет 5 различных потоков, которые вынут из очереди и опубликуют sh в redis на каком-то конкретном c канале.
Код работает отлично, но через 5 или 6 часов механизм publi sh замедляется. Поскольку потоки, которые используются для удаления данных из очереди, становятся медленными и начинают выбрасывать буфер из-за ошибки потока, когда размер очереди достигает максимального значения. скорость добавления данных в очередь такая же, какая была в начале.
Проблема возникает по-разному в другой конфигурации Linux система. Как определить, какую ошибку выдает? Как отладить проблему.
Как указано - код очень прост: основной поток необходим для добавления данных в очередь один за другим, а 5 других потоков могут вывести данные из очереди один. по одному.
Совместное использование кода
import redis
import logging
import sys
logging.basicConfig(level = logging.DEBUG)
redisPub = redis.StrictRedis(host='127.0.0.1', port=6379)
def main():
try:
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
recv_sock.bind(("", 8000))
recv_sock.setblocking(0)
recv_sock.settimeout(5)
except Exception,e:
exc_type, exc_obj, exc_tb = sys.exc_info()
logging.error("Socket Connection Unsuccessful")
print "Program halted."
sys.exit()
sendThEvent = threading.Event()
sendThEvent.clear()
sendPktQ = Queue.Queue(maxsize=2000)
for i in range(0,5):
thSend = threading.Thread(name = 'sendThread', target = sendThread , args = (sendPktQ,sendThEvent))
thSend.setDaemon(True)
thSend.start()
packetsCounting = 0
while True:
try:
recvData, recvAddr = recv_sock.recvfrom(2048)
sendPktQ.put(recvData)
packetsCounting += 1
else:
logging.error("There is some error")
except socket.timeout:
continue
except Exception,e:
exc_type, exc_obj, exc_tb = sys.exc_info()
logging.error(e)
def sendThread(sendPktQ,listenEvent):
if sendPktQ == None:
pass
else:
while True:
instanceSentCnt += 1
if sendPktQ.qsize() < 1:
event_is_set = listenEvent.wait(0)
packetDict = sendPktQ.get()
data = redisPub.publish('chnlName', packetToSend)
logging.info("packet size reached to ============================================ %s ------------ %s"%(len(packetToSend),data))
if __name__ == "__main__":
main()
Любой вход будет высоко оценен.