Отправить объект Python с Кафкой - PullRequest
2 голосов
/ 20 марта 2019

Я работаю над распределенным проектом, и мне нужно отправить объект python из одного микросервиса в другой.Я использую kafka в качестве системы связи и хочу отправить подобное сообщение производителю kafka:

self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
                                  value_serializer=lambda m: json.dumps(m).encode('utf-8'))
data = {'message': {
        'action': 'new_service_order',
        'service_order': ServiceOrder(_service_order_id, _creation_ts, 0, order_items)
    }, 'metadata_type': MessageType.notify_monitor.name}


    self.producer.send('MONITOR', data)

К сожалению, я не могу прочитать это сообщение у потребителя:

consumer = KafkaConsumer(
'MONITOR',
bootstrap_servers=['172.1.1.3:9094'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))

while True:
for message in consumer:
    message = message.value
    print('Received {}'.format(message))

Я пробовал разные способы сериализации / десериализации сообщения, изменяющего параметр 'value_deserializer' (например, mgspack и pickle), но потребитель все равно не смог прочитать сообщение.

Конечно, если я удалю поле 'service_order'все работает отлично, тогда я думаю, что проблема заключается в сериализации объектов.

Каков наилучший способ обмена объектами с кафкой?

...