Google Cloud PubSub обеспечивает Как минимум один раз ( документы ). Это означает, что сообщения могут быть доставлены более одного раза. Чтобы решить эту проблему, вам нужно сделать вашу программу / систему idempotent
У вас есть несколько подписчиков, каждый из которых тянет по 8 сообщений.
Чтобы избежать обработки одного и того же сообщения несколькими подписчиками, acknowledge
сообщение, как только любой подписчик извлекает это сообщение и переходит к дальнейшей обработке, а не подтверждает его в конце, после полной обработки сообщения.
Кроме того, вместо непрерывного запуска основного сценария используйте sleep
в течение некоторого постоянного времени, когда в очереди нет сообщений.
У меня был похожий код, где я использовал синхронное извлечение, за исключением того, что я не использовал параллельную обработку.
Вот код:
PubSubHandler - класс для обработки операций, связанных с Pubsub
from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded
class PubSubHandler:
def __init__(self, subscriber_config):
self.project_name = subscriber_config['PROJECT_NAME']
self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']
self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)
def pull_messages(self,number_of_messages):
try:
response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
received_messages = response.received_messages
except DeadlineExceeded as e:
received_messages = []
print('No messages caused error')
return received_messages
def ack_messages(self,message_ids):
if len(message_ids) > 0:
self.subscriber.acknowledge(self.subscriber_path, message_ids)
return True
Utils - класс для утилитарных методов
import json
class Utils:
def __init__(self):
pass
def decoded_data_to_json(self,decoded_data):
try:
decoded_data = decoded_data.replace("'", '"')
json_data = json.loads(decoded_data)
return json_data
except Exception as e:
raise Exception('error while parsing json')
def raw_data_to_utf(self,raw_data):
try:
decoded_data = raw_data.decode('utf8')
return decoded_data
except Exception as e:
raise Exception('error converting to UTF')
Orcestrator - Основной скрипт
import time
import json
import logging
from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler
class Orcestrator:
def __init__(self):
self.MAX_NUM_MESSAGES = 2
self.SLEEP_TIME = 10
self.util_methods = Utils()
self.pub_sub_handler = PubSubHandler(subscriber_config)
def main_handler(self):
to_ack_ids = []
pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)
if len(pulled_messages) < 1:
self.SLEEP_TIME = 1
print('no messages in queue')
return
logging.info('messages in queue')
self.SLEEP_TIME = 10
for message in pulled_messages:
raw_data = message.message.data
try:
decoded_data = self.util_methods.raw_data_to_utf(raw_data)
json_data = self.util_methods.decoded_data_to_json(decoded_data)
print(json_data)
except Exception as e:
logging.error(e)
to_ack_ids.append(message.ack_id)
if self.pub_sub_handler.ack_messages(to_ack_ids):
print('acknowledged msg_ids')
if __name__ == "__main__":
orecestrator = Orcestrator()
print('Receiving data..')
while True:
orecestrator.main_handler()
time.sleep(orecestrator.SLEEP_TIME)