Я использую пакет confluent_kafka для работы с Kafka.
Я создаю тему таким образом:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
def my_producer():
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
value_schema = avro.load('/home/ValueSchema.avsc')
avroProducer = AvroProducer({
'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'schema.registry.url':'http://my_adress.com:8081',
},
default_value_schema=value_schema
)
for i in range(0, 25000):
value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
avroProducer.produce(topic='my_topik14', value=value)
avroProducer.flush(0)
print('Finished!')
if __name__ == '__main__':
my_producer()
Это работает. (кстати, получим 24820 сообщений вместо 25000 ...)
Мы можем проверить это:
kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819
Теперь я хочу потреблять:
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
c = AvroConsumer(
{'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'group.id': 'avroneversleeps',
'schema.registry.url': 'http://my_adress.com:8081',
'api.version.request': True,
'fetch.min.bytes': 100000,
'consume.callback.max.messages':1000,
'batch.num.messages':2
})
c.subscribe(['my_topik14'])
running = True
while running:
msg = None
try:
msg = c.poll(0.1)
if msg:
if not msg.error():
print(msg.value())
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
print("No Message!! Happily trying again!!")
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.commit()
c.close()
Но есть проблема:
Я читаю сообщения только одно за другим.
У меня вопрос как читать партию сообщений?
Я попробовал разные параметры в Consumer config, но они ничего не сделали!
Также я нашел этот вопрос на SO и попробовал те же параметры - он все еще не работает.
Также прочитайте это. Но это против предыдущей ссылки ...