Сообщения не сбрасываются в кафку - PullRequest
1 голос
/ 20 июня 2020

У меня есть topi c с 5 разделами (A) и 5 ​​потребителями, каждый из которых извлекает сообщения из одного раздела. Выполняет некоторую обработку (занимает около 30 секунд) и перемещает в две темы по 1 разделу в каждой (Topi c -B1, TopicB2). Топи c -B1 - это топи, уплотненный бревном c

kafka- python == 2.0.1

python3 .8

producer = KafkaProducer(bootstrap_servers = cluster_ip, acks = -1, retries = 3)
#acks = -1 is similar to acks = all

Я добавил обратный вызов, ошибку при отправке

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
producer.flush()

Consumer Snippet-

while True:
    raw_messages = consumer.poll(timeout_ms=20000, max_records=2)
    for topic_partition, messages in raw_messages.items():
        for msg in messages:
            try:
                #process msg .....
                data1, data2 = some_func(msg.value)
                producer.send(Topic-B1, key = msg.key, value = data1).add_callback(on_send_success).add_errback(on_send_error)
                producer.flush()
                producer.send(Topic-B2, key = msg.key, value = data2).add_callback(on_send_success).add_errback(on_send_error)
                producer.flush()
                consumer.commit()
            except Exception as e:
                log.error('I am an errback', exc_info=str(e))

Но очень немногие сообщения не были отправлены в Topi c -B1 , Топи c -B2. Я не вижу журналов или ошибок у потребителей. Как убедиться, что сообщения отправлены точно, или как вызвать исключение в случае сбоя.

...