google.cloud.pubsub - Потоковое извлечение сообщений PubSub - PullRequest
0 голосов
/ 02 июля 2018

В настоящее время я провожу некоторые тесты для последней версии google-cloud-pubsub==0.35.4 pubsub. Мое намерение состоит в том, чтобы обработать бесконечный поток (изменяющийся в нагрузке), используя динамическое количество клиентских клиентов.

Однако, когда у меня есть очередь, скажем .. 600 сообщений и 1 клиент работает, а затем добавляются дополнительные клиенты:

  • Ожидается: все оставшиеся сообщения распределяются равномерно по всем клиентам
  • Наблюдение: только новые сообщения распределяются по клиентам, все старые сообщения отправляются уже существующим клиентам

Ниже приведена упрощенная версия того, что я использую для своих клиентов (для справки мы будем использовать только тему с низким приоритетом). Я не буду включать издателя, поскольку он не имеет никакого отношения.

PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3

MESSAGE_LIMIT = 10
ACKS_PER_MIN = 100.00
ACKS_RATIO = {
    PRIORITY_LOW: 100,
}

PRIORITY_TOPICS = {
    PRIORITY_LOW: 'test_low',
}

PRIORITY_SEQUENCES = {
    PRIORITY_LOW: [PRIORITY_LOW, PRIORITY_MEDIUM, PRIORITY_HIGH],
}


class Subscriber:
    subscriber_client = None
    subscriptions = {}

    priority_queue = defaultdict(Queue.Queue)
    priorities = []

    def __init__(self):
        logging.basicConfig()
        self.subscriber_client = pubsub_v1.SubscriberClient()

        for option, percentage in ACKS_RATIO.iteritems():
            self.priorities += [option] * percentage

    def subscribe_to_topic(self, topic, max_messages=10):
        self.subscriptions[topic] = self.subscriber_client.subscribe(
            BASE_TOPIC_PATH.format(project=PROJECT, topic=topic,),
            self.process_message,
            flow_control=pubsub_v1.types.FlowControl(
                max_messages=max_messages,
            ),
        )

    def un_subscribe_from_topic(self, topic):
        subscription = self.subscriptions.get(topic)
        if subscription:
            subscription.cancel()
            del self.subscriptions[topic]

    def process_message(self, message):
        json_message = json.loads(message.data.decode('utf8'))
        self.priority_queue[json_message['priority']].put(message)

    def retrieve_message(self):
        message = None
        priority = random.choice(self.priorities)
        ack_priorities = PRIORITY_SEQUENCES[priority]

        for ack_priority in ack_priorities:
            try:
                message = self.priority_queue[ack_priority].get(block=False)
                break
            except Queue.Empty:
                pass

        return message


if __name__ == '__main__':
    messages_acked = 0

    pub_sub = Subscriber()
    pub_sub.subscribe_to_topic(PRIORITY_TOPICS[PRIORITY_LOW], MESSAGE_LIMIT * 3)

    while True:
        msg = pub_sub.retrieve_message()
        if msg:
            json_msg = json.loads(msg.data.decode('utf8'))

            msg.ack()
            print ("%s - Akked Priority %s , High %s, Medium %s, Low %s" % (
                datetime.datetime.now().strftime('%H:%M:%S'),
                json_msg['priority'],
                pub_sub.priority_queue[PRIORITY_HIGH].qsize(),
                pub_sub.priority_queue[PRIORITY_MEDIUM].qsize(),
                pub_sub.priority_queue[PRIORITY_LOW].qsize(),
            ))

        time.sleep(60.0 / ACKS_PER_MIN)

Мне интересно, является ли это поведение присущим тому, как потоковое извлечение функционирует, или существуют конфигурации, которые могут изменить это поведение.

Ура!

1 Ответ

0 голосов
/ 19 октября 2018

Учитывая документацию Cloud Pub / Sub , Cloud Pub / sub доставляет каждое опубликованное сообщение как минимум один раз для каждой подписки, тем не менее, есть некоторые исключения из этого поведения:

  • Сообщение, которое не может быть доставлено в течение максимального срока хранения 7 дней, удаляется.
  • Сообщение, опубликованное до создания подписки, доставлено не будет.

Другими словами, служба будет доставлять сообщения в подписки, созданные до публикации сообщения, поэтому старые сообщения не будут доступны для новых подписок. Насколько я знаю, Cloud Pub / Sub не предоставляет возможности для изменения этого поведения.

...