Производитель Кафки в другом процессе, кажется, не работает в Python - PullRequest
0 голосов
/ 04 марта 2019

Я использую микросервис в python, который реализует несколько обработчиков для каждого типа сообщений.

class MsFeatureLandmark(BaseMicroservice):

    def __init__(self):
        self.config = safe_load(open(sys.argv[1]))
        client = MongoClient(self.config.get('mongo').get('connection_url'))
        database = client[self.config.get('mongo').get('mongo_db')]
        self.dict = {
            MessageType.create_model.name: ComputeLocDescHandler(self.config),
            MessageType.detect_all.name: ExtractFeatureHandler(self.config, database),
            MessageType.detect_landmark.name: ExtractFeatureHandler(self.config, database)

        }
        super().__init__(self.dict, self.config.get('kafka'))

    def on_message_received(self, generic_message):
        # self.dict.get(generic_message.metadata_type).handle(generic_message.message)
        p = Process(target=self.dict.get(generic_message.metadata_type).handle, args=(generic_message.message,))
        p.daemon = True
        p.start()


MsFeatureLandmark().run()

Для каждого полученного сообщения я запускаю соответствующий метод .handle ().

В конце вычисления (которое включает в себя тензорный поток) я использую методы, унаследованные от суперкласса, для отправки сообщения с использованием kafka

def write_message(self, message):
    if(self.is_prod_init):
        output_topic = self.config.get('kafka').get('output_topic')
        cl.logging.info("Sending on " + output_topic + " message: " + str(message))
        self.producer.send(output_topic, message)
    else:
        raise ValueError('Producer is not initialized.')

def init_producer(self):
    self.producer = KafkaProducer(bootstrap_servers=self.config.get('kafka').get('bootstrap_servers'),
                                  value_serializer=lambda m: json.dumps(m).encode('utf-8'))
    self.is_prod_init = True
    self.producer.flush()

Когда я запускаю весь этот код синхронизированным способом (безс использованием Process (target = self. etc.) все работает правильно, но производитель использует тайм-аут, поскольку обработка данных занимает слишком много времени.

Если я запускаю его с помощью Process, я не получаю сообщений об ошибках, нокажется, что производитель по какой-либо причине не производит никакого сообщения.

Что мне не хватает?

РЕДАКТИРОВАТЬ: По какой-то причине, когда я в последний раз запускаю этот микросервис, возникла исключительная ситуация, и потребитель(который запускается в другом процессе) успешно прочитал сообщение. Это заставило меня задуматься: если я вызову исключение, будет ли отправлено сообщение? Да.

    self.write_message(message)
    raise Exception('spakkiggnustel')

добавление этой последней строки «решило» проблему.Зачем?Я запутался больше, чем был до

...