Ошибка многопроцессорного ведения журнала Python, qsize () растет бесконечно - PullRequest
0 голосов
/ 12 декабря 2018

Для справки: я использую django с работниками django_rq при отправке журналов в AWS Cloudwatch (подробности могут быть опущены для краткости ...).Случайно, журналы, отправленные рабочими, перестают попадать в обработчик CloudWatchLogger.Этот CloudWatchLogger обернут в оболочку multiprocessing_logging, назовем его MPLogger.

В функции emit () MPLogger, если я вызываю print (self.queue.qsize ()), я вижу, что когда начинается сбой,qsize начинает бесконечно увеличиваться в размерах, в то время как self.queue.empty () возвращает True в функции _receive (), в результате чего метод emit () CloudWatchLogger никогда больше не будет вызываться ...

Любая идеячто может быть причиной этой блокировки?... или как я могу отладить, что вызывает это?Спасибо!Буду признателен за любые отзывы.Код ниже:

В MPLogger:

# _receive() is running in a thread, self.sub_handler is CloudWatchLogger
def _receive(self):
    while not (self._is_closed and self.queue.empty()):
        try:
            record = self.queue.get(timeout=0.2)
            self.sub_handler.emit(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except EOFError:
            break
        except queue.Empty:
            pass  # This periodically checks if the logger is closed.
        except:
            traceback.print_exc(file=sys.stderr)

    self.queue.close()
    self.queue.join_thread()

def emit(self, record):
    try:
        s = self._format_record(record)
        self.queue.put_nowait(record)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        self.handleError(record)

В CloudWatchLogger:

def emit(self, message):
    cwl_message = dict(timestamp=int(message.created * 1000), message=self.format(message))

    if self.stream_name not in self.queues:
        # start _batch_sender() in a thread referencing this queue
        self.make_queue_and_thread(stream_name)
    self.queues[stream_name].put(cwl_message)

# batch_sender() in a thread
def batch_sender(self, my_queue, stream_name, send_interval, max_batch_size, max_batch_count):
    msg = None

    # See https://boto3.readthedocs.io/en/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events
    while msg != self.END:
        cur_batch = [] if msg is None or msg == self.FLUSH else [msg]
        cur_batch_size = sum(size(msg) for msg in cur_batch)
        cur_batch_msg_count = len(cur_batch)
        cur_batch_deadline = time.time() + send_interval
        while True:
            try:
                msg = my_queue.get(block=True, timeout=max(0, cur_batch_deadline-time.time()))
                if size(msg) > max_batch_size:
                    msg = truncate(msg)
            except queue.Empty:
                # If the queue is empty, we don't want to reprocess the previous message
                msg = None
            if msg is None \
               or msg == self.END \
               or msg == self.FLUSH \
               or cur_batch_size + size(msg) > max_batch_size \
               or cur_batch_msg_count >= max_batch_count \
               or time.time() >= cur_batch_deadline:
                self._submit_batch(cur_batch, stream_name)
                if msg is not None:
                    # We don't want to call task_done if the queue was empty and we didn't receive anything new
                    my_queue.task_done()
                break
            elif msg:
                cur_batch_size += size(msg)
                cur_batch_msg_count += 1
                cur_batch.append(msg)
                my_queue.task_done()
...