Я работаю над распределенным проектом, и мне нужно отправить объект python из одного микросервиса в другой.Я использую kafka в качестве системы связи и хочу отправить подобное сообщение производителю kafka:
self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
data = {'message': {
'action': 'new_service_order',
'service_order': ServiceOrder(_service_order_id, _creation_ts, 0, order_items)
}, 'metadata_type': MessageType.notify_monitor.name}
self.producer.send('MONITOR', data)
К сожалению, я не могу прочитать это сообщение у потребителя:
consumer = KafkaConsumer(
'MONITOR',
bootstrap_servers=['172.1.1.3:9094'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
while True:
for message in consumer:
message = message.value
print('Received {}'.format(message))
Я пробовал разные способы сериализации / десериализации сообщения, изменяющего параметр 'value_deserializer' (например, mgspack и pickle), но потребитель все равно не смог прочитать сообщение.
Конечно, если я удалю поле 'service_order'все работает отлично, тогда я думаю, что проблема заключается в сериализации объектов.
Каков наилучший способ обмена объектами с кафкой?