Создание будущего асинхронного потокового извлечения блокирует дальнейшее создание объекта ресурса для листов api v4 - PullRequest
0 голосов
/ 28 мая 2019

У меня есть несколько процессов, порожденных многопроцессорностью для поддержки моего бота google.chat.Один из них прослушивает сообщения, используя запрос StreamingPull от API pubsub, и помещает входящие сообщения в messagesMessagesQueue.

Другой принимает сообщения из входящего сообщенияMessagesQueue и передает их обработчикам.Некоторые из обработчиков пишут в Gsheets.Проблема заключается в том, что

  1. будущее инициализируется до того, как какие-либо сообщения попадают в очередь, когда объект-обработчик пытается создать объект ресурса с помощью apiclient.discovery.build ('sheet', 'v4 ', credentials = self.credentials, requestBuilder = self.build_request) Эта строка застряла навсегда, исключение не выдается.Похоже, что где-то происходит блокировка потока.

  2. Когда я сначала помещаю некоторые сообщения в ReceiveMessagesQueue, а затем запускаю процесс прослушивания, который использует будущее -> все работает отлично.Все объекты Gsheet Resource идеально созданы.

Кажется, что где-то отсутствует блокировка потока, и я не могу понять, где.Конечно, я могу использовать сценарий №2, чтобы заставить вещи работать, но ДЕЙСТВИТЕЛЬНО хочу понять, что происходит.

К вашему сведению.Все мои объекты Resource являются поточно-ориентированными, рекомендованными Google.Когда http транспорт создается независимо для каждого потока.

Класс, который слушает сообщения бота:

class MessageHandlerBase(object):
    def __init__(self, project_id, subscription_name, pathToSecretsFile, **kwargs):

        self.project_id = project_id
        self.subscription_name = subscription_name
        self.futureExecutes = Event()
        #some other logging code here

    def startListening(self, callback):
        raw_credentials = service_account.Credentials.from_service_account_file(self.secrets['google_API'])
        self.subscriber = pubsub.SubscriberClient(credentials = raw_credentials)
        self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_name)

        self.future = self.subscriber.subscribe(self.subscription_path, callback=callback)
        self.futureExecutes.set()

    def stop(self):
        if self.futureExecutes.is_set():
            self.future.cancel()

обратный вызов MessageHandler

def putIncomingMessageToQueue(logQueue):
    def f(message):
        incomigMessagesQueue.put(json.loads(message.data))
        logger.info(u'Put message to Queue ' +  unicode(json.loads(message.data)))
        message.ack() 
    return f

Обработчик, который залипает образец:

class registrationHandler(object):
    def __init__(self, **kwargs):
        pathToSecretsFile = kwargs.get('settings')

        # messages queue that later on will be handled by main process
        self.messagesQueue = kwargs.get('messagesQueue')


        self.logger.info('i am in the registration handler init 2')
        try:
            self.databaseSheet = DynamicSpreadSheet(spreadsheet=self.secrets['bot_config_sheet_id'], sheet_name='bot users', Credentials_file_name=self.secrets['google_API'], **{'logQueue':logQueue})
        except Exception, e:
            self.logger.info('registrationHandler: failed to init a Spreadsheet')
            self.logger.info("".join(traceback.format_exception(*sys.exc_info())))
        self.logger.info('message handler created')

Создание DynamicSpreadSheet навсегда заблокировано на этапе инициализации, когда вызывается обнаружение apiclient.

class DynamicSpreadSheet(object):
    def __init__(self, spreadsheet, sheet_name, Credentials_file_name, **kwargs):
        raw_credentials = service_account.Credentials.from_service_account_file(Credentials_file_name)

        self.credentials = raw_credentials.with_scopes(['https://www.googleapis.com/auth/spreadsheets','https://www.googleapis.com/auth/drive'])
        self.service = apiclient.discovery.build('sheets', 'v4', credentials = self.credentials, requestBuilder=self.build_request) 
        self.sheet_name = sheet_name

Так что если вы звоните вот так

ProcessHandleMessages = Process(group=None, target = HandleMessages)
incomingMessagesListener = MessageHandlerBase(project_id, subscription_name, 'config/secrets.txt', **{'logQueue': logQueue})

ProcessHandleMessages.start()
incomingMessagesListener.startListening(callback = putIncomingMessageToQueue(logQueue))

и пишете сообщениек боту, который должен перейти в обработчик -> все застряло

Но если вы сделаете

ProcessHandleMessages = Process(group=None, target = HandleMessages)
ProcessHandleMessages.start()

incomingMessagesQueue.put(message)

incomingMessagesListener = MessageHandlerBase(project_id, subscription_name, 'config/secrets.txt', **{'logQueue': logQueue})
incomingMessagesListener.startListening(callback = putIncomingMessageToQueue(logQueue))

, тогда все остальные сообщения, поступающие от бота, будут правильно использованы

Я на Python 2.7 (не спрашивайте меня, почему:)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...