Не в состоянии получить несколько записей через опрос API в клиенте Python Confluent Kafka - PullRequest
0 голосов
/ 26 декабря 2018

Я использую библиотеку Python Confluent kafka для приема сообщений, я хочу получить несколько записей в опросе.но почему-то я получаю только одну запись за раз.

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'mygroup',
    'session.timeout.ms': 6000,
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test-topic'])
while True:
    msg = c.poll(timeout=1.0)
    if msg is None:
        continue
    if msg.error():
        continue
    else:
        # Proper message
        print('%s [%d] at offset %d with key %s:' %
                         (msg.topic(), msg.partition(), msg.offset(),
                          str(msg.key())))

        msg = msg.value().decode('utf-8')
        print('Received message: {}'.format(msg))

Есть ли какой-либо параметр / параметр, который мне нужно передать в опрос, или есть какой-либо другой API, чтобы я мог получить несколько записей в одном опросе?звоните.

1 Ответ

0 голосов
/ 27 декабря 2018

Я обнаружил использовать API, который возвращает несколько записей.

Параметры: num_messages (int) - Максимальное количество возвращаемых сообщений (по умолчанию: 1).

timeout (float) - Максимальное время, чтобы заблокировать ожидание сообщения, события или обратного вызова (по умолчанию: бесконечно (-1)).(Секунды)

Возвращает: список объектов сообщений (возможно, пустой по таймауту)

from confluent_kafka import Consumer, KafkaException

consumer = Consumer({
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'mygroup',
    'session.timeout.ms': 6000,
    "enable.auto.commit": "false",
    'auto.offset.reset': 'earliest'
})

emptyPollLimit = 10
emptyPollCounter = 0

try:
    consumer.subscribe(['test-topic'])
    records = consumer.consume(timeout=1, num_messages=20)
    while emptyPollCounter < emptyPollLimit:
        print(records)
        if not records:
            print("no record found retrying {} times".format(emptyPollCounter + 1))
            emptyPollCounter += 1

        else:
            print("total records read: {}".format(len(records)))
            for records in records:
                print('topic: %s partition:%d at offset: %d with key: %s:' %
                      (records.topic(), records.partition(), records.offset(),
                       str(records.key())))
                records = records.value().decode('utf-8')
                print('Received message value: {}'.format(records))
        consumer.commit(async=False)
        records = consumer.consume(timeout=1, num_messages=20)

except KafkaException as ex:
    print(ex)

finally:
    # Close down consumer to commit final offsets.
    consumer.close()
...