Как TTL (время жизни) применяется к пространству имен? - PullRequest
1 голос
/ 20 марта 2019

Apache Pulsar имеет функцию TTL, как описано в разделе Хранение и срок действия сообщения официальной документации.Однако я не могу определить, где в конфигурации указано, как часто выполняется эта проверка.Используя стандартную команду bin/pulsar standalone, с настраиваемым пространством имен, с ttl, настроенным на 5 секунд bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5.

. Я вижу, что срок действия сообщений истекает только через заданный интервал, и следующее сообщение журналавывод на консоль:

15: 11: 59.337 [pulsar-msg-expiry-monitor-52-1] INFO org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [постоянный: // public / ttl-test / my-topic] [spark-shell] Проверка истечения срока действия начального сообщения, ttl = 5 секунд

Суть моего вопроса такова: как я могу увеличить скорость, с которойсообщения проверяются на предмет превышения TTL?

Ответы [ 2 ]

2 голосов
/ 20 марта 2019

Конфигурация messageExpiryCheckIntervalInMinutes в брокере определяет, как часто темы пространства имен проверяются на наличие сообщений с истекшим сроком действия.

В соответствии с официальной документацией конфигурация

0 голосов
/ 10 мая 2019

Используйте команду set-message-ttl и укажите имя пространства имен (по умолчанию public / default для постоянной темы) и время.

bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120

Пример кода производителя и потребителя для достижения ttl (клиент Python)

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')

producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))

producer.close()
client.close()

Вы можете отправить несколько сообщений, используя метод отправки.Название темы должно быть одинаковым в обоих классах.

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")

//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

//Here we are not acknowledge all the messages.

//close the consumer and client
consumer.close()
client.close()

В течение 120 секунд мы снова открываем клиента и потребителя и пытаемся прочитать одинаковые сообщения, что не публикуется. Снова закрываем клиента и потребителя.

Позже (через 120 секунд), снова мы откроем клиента и потребителя, затем попытаемся получить сообщение.Но это не должно прийти.В этом состоянии вы достигаете Время жить.

...