Доступ к общей общей переменной между двумя сопрограммами - PullRequest
1 голос
/ 14 июня 2019

Я работаю на Python3.6 с GCP PubSub.

  1. Я получаю ответный звонок от PubSub для подписанной темы.

  2. После получения сообщения обратного вызова я поставил сообщение в очередь и ACK

  3. Я обработал полученное сообщение в init_apns_process ()

Будущая проблема:

Если я мгновенно принимаю ответный вызов (что я и делаю), и хотя processing(init_apns_process) произошло исключение, я потеряю свое сообщение с любого конца.

Expectation:

Не отправлять ACK PubSub до завершения init_apns_process().

Мое решение до сих пор:

apns_msg_queue = queue.Queue()
start_time = 0


async def init_apns_process(message):
    logging.info("Process Msg: {0}".format(message))
    await send_apns_notification(token)


async def send_apns_notification(device_token):
    global start_time
    apns = None
    apns = mi_apns_dev
    apns_payload = {"alert": "hello world 121", "serial": 21, "sound": "mov.mp3",
                    "mutable-content": "1", "category": "done", "id": -9999}

    if apns is not None:
        request = NotificationRequest(
            device_token=device_token,
            message={"aps": apns_payload}
        )

        ret = await apns.send_notification(request)
        if not ret.is_successful:
            logger.error(
                f"Failed to send APNS notification ID:")
        else:
            logging.info(f"Successfully sent APNS notification"
                         f" Time Taken in processing: {time.time() - start_time}")
             #update shared FLAG

def callback_value(message):
    apns_msg_queue.put(message)
    # return based on value of shared FLAG
    return True


if __name__ == '__main__':
    logger.setup()
    mi_apns_dev = APNs('apns.pem', use_sandbox=False)
    loop = asyncio.get_event_loop()

    subscriber = sub.PubSubSubscriber()
    subscriber.subscribe(subscription_name, callback_value, auto_ack_on_received=False)

    while True:
        if apns_msg_queue.qsize() > 0:
            msg = apns_msg_queue.get()
            if msg is None:
                break
            loop.run_until_complete(init_apns_process(msg))
        time.sleep(5)

Может иметь некоторый общий флаг между двумя вышеуказанными задачами и ACK PubSub в зависимости от его значения?

...