У меня есть несколько процессов, порожденных многопроцессорностью для поддержки моего бота google.chat.Один из них прослушивает сообщения, используя запрос StreamingPull от API pubsub, и помещает входящие сообщения в messagesMessagesQueue.
Другой принимает сообщения из входящего сообщенияMessagesQueue и передает их обработчикам.Некоторые из обработчиков пишут в Gsheets.Проблема заключается в том, что
будущее инициализируется до того, как какие-либо сообщения попадают в очередь, когда объект-обработчик пытается создать объект ресурса с помощью apiclient.discovery.build ('sheet', 'v4 ', credentials = self.credentials, requestBuilder = self.build_request) Эта строка застряла навсегда, исключение не выдается.Похоже, что где-то происходит блокировка потока.
Когда я сначала помещаю некоторые сообщения в ReceiveMessagesQueue, а затем запускаю процесс прослушивания, который использует будущее -> все работает отлично.Все объекты Gsheet Resource идеально созданы.
Кажется, что где-то отсутствует блокировка потока, и я не могу понять, где.Конечно, я могу использовать сценарий №2, чтобы заставить вещи работать, но ДЕЙСТВИТЕЛЬНО хочу понять, что происходит.
К вашему сведению.Все мои объекты Resource являются поточно-ориентированными, рекомендованными Google.Когда http транспорт создается независимо для каждого потока.
Класс, который слушает сообщения бота:
class MessageHandlerBase(object):
def __init__(self, project_id, subscription_name, pathToSecretsFile, **kwargs):
self.project_id = project_id
self.subscription_name = subscription_name
self.futureExecutes = Event()
#some other logging code here
def startListening(self, callback):
raw_credentials = service_account.Credentials.from_service_account_file(self.secrets['google_API'])
self.subscriber = pubsub.SubscriberClient(credentials = raw_credentials)
self.subscription_path = self.subscriber.subscription_path(self.project_id, self.subscription_name)
self.future = self.subscriber.subscribe(self.subscription_path, callback=callback)
self.futureExecutes.set()
def stop(self):
if self.futureExecutes.is_set():
self.future.cancel()
обратный вызов MessageHandler
def putIncomingMessageToQueue(logQueue):
def f(message):
incomigMessagesQueue.put(json.loads(message.data))
logger.info(u'Put message to Queue ' + unicode(json.loads(message.data)))
message.ack()
return f
Обработчик, который залипает образец:
class registrationHandler(object):
def __init__(self, **kwargs):
pathToSecretsFile = kwargs.get('settings')
# messages queue that later on will be handled by main process
self.messagesQueue = kwargs.get('messagesQueue')
self.logger.info('i am in the registration handler init 2')
try:
self.databaseSheet = DynamicSpreadSheet(spreadsheet=self.secrets['bot_config_sheet_id'], sheet_name='bot users', Credentials_file_name=self.secrets['google_API'], **{'logQueue':logQueue})
except Exception, e:
self.logger.info('registrationHandler: failed to init a Spreadsheet')
self.logger.info("".join(traceback.format_exception(*sys.exc_info())))
self.logger.info('message handler created')
Создание DynamicSpreadSheet навсегда заблокировано на этапе инициализации, когда вызывается обнаружение apiclient.
class DynamicSpreadSheet(object):
def __init__(self, spreadsheet, sheet_name, Credentials_file_name, **kwargs):
raw_credentials = service_account.Credentials.from_service_account_file(Credentials_file_name)
self.credentials = raw_credentials.with_scopes(['https://www.googleapis.com/auth/spreadsheets','https://www.googleapis.com/auth/drive'])
self.service = apiclient.discovery.build('sheets', 'v4', credentials = self.credentials, requestBuilder=self.build_request)
self.sheet_name = sheet_name
Так что если вы звоните вот так
ProcessHandleMessages = Process(group=None, target = HandleMessages)
incomingMessagesListener = MessageHandlerBase(project_id, subscription_name, 'config/secrets.txt', **{'logQueue': logQueue})
ProcessHandleMessages.start()
incomingMessagesListener.startListening(callback = putIncomingMessageToQueue(logQueue))
и пишете сообщениек боту, который должен перейти в обработчик -> все застряло
Но если вы сделаете
ProcessHandleMessages = Process(group=None, target = HandleMessages)
ProcessHandleMessages.start()
incomingMessagesQueue.put(message)
incomingMessagesListener = MessageHandlerBase(project_id, subscription_name, 'config/secrets.txt', **{'logQueue': logQueue})
incomingMessagesListener.startListening(callback = putIncomingMessageToQueue(logQueue))
, тогда все остальные сообщения, поступающие от бота, будут правильно использованы
Я на Python 2.7 (не спрашивайте меня, почему:)