BufferError: Local: очередь заполнена Python - PullRequest
0 голосов
/ 16 июня 2020
import logging
from confluent_kafka import Producer
import os

logger = logging.getLogger("main")

BOOTSTRAP_SERVERS = os.environ['BOOTSTRAP_SERVERS']
APPLICATION_ID = os.getenv('APPLICATION_ID', default = "nke-data-source")
RECONNECT_BACKOFF_MS = os.getenv('RECONNECT_BACKOFF_MS', default = 1000)
REQUEST_TIMEOUT_MS = os.getenv('REQUEST_TIMEOUT_MS', default = 40000)
ACKS = os.getenv('ACKS', default = "all")
RETRIES = os.getenv('RETRIES', default = 15)
RETRY_BACK_OFF = os.getenv('RETRY_BACK_OFF', default = 1000)
MAX_IN_FLIGHT_REQUESTS = os.getenv('MAX_IN_FLIGHT_REQUESTS', default = 1)
topic = os.getenv('OUTBOUND_TOPIC', default = "tti-nke-raw")

p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS, 
    'client.id': APPLICATION_ID, 
    'reconnect.backoff.ms': RECONNECT_BACKOFF_MS,
    'request.timeout.ms': REQUEST_TIMEOUT_MS,
    'acks': ACKS,
    'retries': RETRIES,
    'retry.backoff.ms': RETRY_BACK_OFF,
    'max.in.flight.requests.per.connection': MAX_IN_FLIGHT_REQUESTS,
    'compression.type': "lz4"})

def send(key, event):
    try:
        logger.info("Sending key: [{0}] value: [{1}]".format(key, event))
        p.produce(topic=topic, value=event.encode('utf-8'), key=key)
    except Exception:
        logger.error("error sending events to kafka", exc_info=True)

Ошибка: -

Traceback (most recent call last):
BufferError: Local: Queue full
File "/app/sender.py", line 30, in send
p.produce(topic=topic, value=event.encode('utf-8'), key=key)

Может ли кто-нибудь помочь мне в этом, поскольку я новичок в python

1 Ответ

1 голос
/ 16 июня 2020

Это Queue что-то реализовано в библиотеке librdkafka (к которой confluent_kafka привязан)

Существует внутренний Queue для продукта, который принимает отчет о доставке производителя и ждет продукт, чтобы справиться с ними (в основном ничего не делая), но вам нужно запустить этот механизм прохождения очереди, который можно просто вызвать, вызвав poll

. Вы должны позвонить producer.poll(0) после каждого производства так что измените:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)

на:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)
p.poll(0)

Это вызовет очистку очереди, не беспокойтесь о производительности, потому что это очень простая функция, которая на самом деле не так много, как автор librdkafka писал:

вызов poll () дешевый, он не повлияет на производительность, поэтому, пожалуйста, добавьте его к своему производителю l oop.

в основном, что он делает:

вызов poll () через регулярные промежутки времени, чтобы обслуживать обратные вызовы отчета о доставке производителя. тоже

...