Для справки: я использую django с работниками django_rq при отправке журналов в AWS Cloudwatch (подробности могут быть опущены для краткости ...).Случайно, журналы, отправленные рабочими, перестают попадать в обработчик CloudWatchLogger.Этот CloudWatchLogger обернут в оболочку multiprocessing_logging, назовем его MPLogger.
В функции emit () MPLogger, если я вызываю print (self.queue.qsize ()), я вижу, что когда начинается сбой,qsize начинает бесконечно увеличиваться в размерах, в то время как self.queue.empty () возвращает True в функции _receive (), в результате чего метод emit () CloudWatchLogger никогда больше не будет вызываться ...
Любая идеячто может быть причиной этой блокировки?... или как я могу отладить, что вызывает это?Спасибо!Буду признателен за любые отзывы.Код ниже:
В MPLogger:
# _receive() is running in a thread, self.sub_handler is CloudWatchLogger
def _receive(self):
while not (self._is_closed and self.queue.empty()):
try:
record = self.queue.get(timeout=0.2)
self.sub_handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except queue.Empty:
pass # This periodically checks if the logger is closed.
except:
traceback.print_exc(file=sys.stderr)
self.queue.close()
self.queue.join_thread()
def emit(self, record):
try:
s = self._format_record(record)
self.queue.put_nowait(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
В CloudWatchLogger:
def emit(self, message):
cwl_message = dict(timestamp=int(message.created * 1000), message=self.format(message))
if self.stream_name not in self.queues:
# start _batch_sender() in a thread referencing this queue
self.make_queue_and_thread(stream_name)
self.queues[stream_name].put(cwl_message)
# batch_sender() in a thread
def batch_sender(self, my_queue, stream_name, send_interval, max_batch_size, max_batch_count):
msg = None
# See https://boto3.readthedocs.io/en/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events
while msg != self.END:
cur_batch = [] if msg is None or msg == self.FLUSH else [msg]
cur_batch_size = sum(size(msg) for msg in cur_batch)
cur_batch_msg_count = len(cur_batch)
cur_batch_deadline = time.time() + send_interval
while True:
try:
msg = my_queue.get(block=True, timeout=max(0, cur_batch_deadline-time.time()))
if size(msg) > max_batch_size:
msg = truncate(msg)
except queue.Empty:
# If the queue is empty, we don't want to reprocess the previous message
msg = None
if msg is None \
or msg == self.END \
or msg == self.FLUSH \
or cur_batch_size + size(msg) > max_batch_size \
or cur_batch_msg_count >= max_batch_count \
or time.time() >= cur_batch_deadline:
self._submit_batch(cur_batch, stream_name)
if msg is not None:
# We don't want to call task_done if the queue was empty and we didn't receive anything new
my_queue.task_done()
break
elif msg:
cur_batch_size += size(msg)
cur_batch_msg_count += 1
cur_batch.append(msg)
my_queue.task_done()