Потоки в python и очереди приводят к утечке памяти или ошибке памяти - PullRequest
0 голосов
/ 09 марта 2020

Я создал простой код с использованием многопоточности в python с очередью. У меня есть основной поток, который продолжает добавлять данные в очередь (размер очереди составляет 2000), и будет 5 различных потоков, которые вынут из очереди и опубликуют sh в redis на каком-то конкретном c канале.

Код работает отлично, но через 5 или 6 часов механизм publi sh замедляется. Поскольку потоки, которые используются для удаления данных из очереди, становятся медленными и начинают выбрасывать буфер из-за ошибки потока, когда размер очереди достигает максимального значения. скорость добавления данных в очередь такая же, какая была в начале.

Проблема возникает по-разному в другой конфигурации Linux система. Как определить, какую ошибку выдает? Как отладить проблему.

Как указано - код очень прост: основной поток необходим для добавления данных в очередь один за другим, а 5 других потоков могут вывести данные из очереди один. по одному.

Совместное использование кода

   import redis
   import logging
   import sys
   logging.basicConfig(level = logging.DEBUG)
   redisPub = redis.StrictRedis(host='127.0.0.1', port=6379)

def main():
    try:
        recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
        recv_sock.bind(("", 8000))
        recv_sock.setblocking(0)
        recv_sock.settimeout(5)
    except Exception,e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        logging.error("Socket Connection Unsuccessful")
        print "Program halted."
        sys.exit()

    sendThEvent     = threading.Event()
    sendThEvent.clear()
    sendPktQ = Queue.Queue(maxsize=2000)
    for i in range(0,5):
        thSend = threading.Thread(name = 'sendThread', target = sendThread , args = (sendPktQ,sendThEvent))
        thSend.setDaemon(True)
        thSend.start()

    packetsCounting = 0
    while True:
        try:
            recvData, recvAddr = recv_sock.recvfrom(2048)
            sendPktQ.put(recvData)
            packetsCounting += 1

            else:
                logging.error("There is some error")
        except socket.timeout:
            continue
        except Exception,e:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            logging.error(e)



def sendThread(sendPktQ,listenEvent):
    if sendPktQ == None:
        pass
    else:
        while True:
            instanceSentCnt += 1
            if sendPktQ.qsize() < 1:
                event_is_set = listenEvent.wait(0)

            packetDict = sendPktQ.get()
            data = redisPub.publish('chnlName', packetToSend)
            logging.info("packet size reached to ============================================ %s ------------ %s"%(len(packetToSend),data))



if __name__ == "__main__":
    main()

Любой вход будет высоко оценен.

1 Ответ

0 голосов
/ 19 апреля 2020

Я вижу sendPktQ.get(), но не sendPktQ.task_done(), например:

async def _dispatch_packet(self, destination=None) -> None:
    """Send a command unless in listen_only mode."""
    if not self.command_queue.empty():
        cmd = self.command_queue.get()

        if not (destination is None or self.config.get("listen_only")):
            destination.write(bytearray(f"{cmd}\r\n".encode("ascii")))
            await asyncio.sleep(0.05)

        self.command_queue.task_done()

См. документы библиотеки очередей

...