Я использую микросервис в python, который реализует несколько обработчиков для каждого типа сообщений.
class MsFeatureLandmark(BaseMicroservice):
def __init__(self):
self.config = safe_load(open(sys.argv[1]))
client = MongoClient(self.config.get('mongo').get('connection_url'))
database = client[self.config.get('mongo').get('mongo_db')]
self.dict = {
MessageType.create_model.name: ComputeLocDescHandler(self.config),
MessageType.detect_all.name: ExtractFeatureHandler(self.config, database),
MessageType.detect_landmark.name: ExtractFeatureHandler(self.config, database)
}
super().__init__(self.dict, self.config.get('kafka'))
def on_message_received(self, generic_message):
# self.dict.get(generic_message.metadata_type).handle(generic_message.message)
p = Process(target=self.dict.get(generic_message.metadata_type).handle, args=(generic_message.message,))
p.daemon = True
p.start()
MsFeatureLandmark().run()
Для каждого полученного сообщения я запускаю соответствующий метод .handle ().
В конце вычисления (которое включает в себя тензорный поток) я использую методы, унаследованные от суперкласса, для отправки сообщения с использованием kafka
def write_message(self, message):
if(self.is_prod_init):
output_topic = self.config.get('kafka').get('output_topic')
cl.logging.info("Sending on " + output_topic + " message: " + str(message))
self.producer.send(output_topic, message)
else:
raise ValueError('Producer is not initialized.')
def init_producer(self):
self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
self.is_prod_init = True
self.producer.flush()
Когда я запускаю весь этот код синхронизированным способом (безс использованием Process (target = self. etc.) все работает правильно, но производитель использует тайм-аут, поскольку обработка данных занимает слишком много времени.
Если я запускаю его с помощью Process, я не получаю сообщений об ошибках, нокажется, что производитель по какой-либо причине не производит никакого сообщения.
Что мне не хватает?
РЕДАКТИРОВАТЬ: По какой-то причине, когда я в последний раз запускаю этот микросервис, возникла исключительная ситуация, и потребитель(который запускается в другом процессе) успешно прочитал сообщение. Это заставило меня задуматься: если я вызову исключение, будет ли отправлено сообщение? Да.
self.write_message(message)
raise Exception('spakkiggnustel')
добавление этой последней строки «решило» проблему.Зачем?Я запутался больше, чем был до