Kafka Consumer: Как прочитать поле c Avro в Python? - PullRequest
0 голосов
/ 11 марта 2020

В приведенном ниже фрагменте кода потребителя я могу получить отправленные данные. Как мне получить доступ к определенным значениям из всех данных для работы с.

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
                                             KeySerializerError,
                                             ValueSerializerError)

***
***
***

c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)
        print(msg)

Спасибо

Ответы [ 2 ]

1 голос
/ 12 марта 2020

На самом деле это можно сделать двумя способами:

msg.value()['myFieldName']

или

msg.value().get('myFieldName')

Например,

c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'localhost:8081',
    'group.id': 'test-group'
})


c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)

        if msg:
            print(f"field1 Value: {msg.value()['field1']}")
            print(f"field2 Value: {msg.value().get('field2')}")

        else: 
            pass
    except SerializerError as e:
        print(f"Message deserialization failed for message {msg}:\n{e}")
1 голос
/ 12 марта 2020

Я вижу, что вы импортируете AvroConsumer, поэтому у вас будет

c.value()['field'] 
...