Невозможно получить ключ в Kafka Python Consumer - PullRequest
0 голосов
/ 21 мая 2018

Я написал код, который будет генерировать сообщения в ключ-значение в теме:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))

deviceId = "4bc03533ccc94065"
responseId = "c03c4851-701f-4265-aafd-eb133c09c08e"

print deviceId
print responseId

producer.send('collect-response-devices', {'deviceId': deviceId})
producer.send('collect-response-responses', {'responseId' : responseId})

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    print excp
    # handle exception

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)

Однако мой потребитель будет использовать сообщения, ничего не назначая (None в key)и все в части стоимости.

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['collect-response-devices'])
for message in consumer:
  print (message.key, message.value)

Это вывод потребителя:

(None, {u'deviceId ': u'4bc03533ccc94065'})

1 Ответ

0 голосов
/ 21 мая 2018

Для документов вам необходимо указать ключ при создании сообщения:

# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')

В данный момент вы не указываете ключ при создании сообщения,следовательно, получая None

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...