Как ограничить количество полученных подписок в среде интенсивного производителя - PullRequest
0 голосов
/ 09 января 2020

Я использую публикацию / подписку для проекта и хочу найти решение для проблемы, которую я предвижу в будущем.

Проблема, которую я вижу, состоит в том, что у нас будет много издателей, которых они будут обновление некоторых данных и приведет к опубликованию sh сообщения об обновлении (по указанному c топи c), будет много тем и много обновлений на каждую топи c (например, 1000 обновлений за 1 секунду в одной топи c и 2000 в другом), и я должен предотвратить это, поскольку я не хочу, чтобы мои подписчики получали тонны обновленного сообщения в секунду.

Мне просто важно иметь один обновлять каждую топи c каждые n секунд при появлении любого нового сообщения

Посмотрите на этот пример (n = 1 секунда):

[time: 0.00] *message (publish message as it is first message)
[time: 0.02] message (nop)
[time: 0.03] message (nop)
[time: 0.04] message (nop)
[time: 0.10] message (nop)
[time: 1.00] *(1 second after last publish) (publish message as as we had a message at 0.10) 
[time: 1.22] message (nop)
[time: 2.00] *(1 second after last publish) (publish message as as we had a message at 1.22) 
(NO UPDATE as no update from last publish)
[time: 5.50] message (publish message as it is first message in last second)
[time: 5.60] message (nop)
[time: 6.50] *(1 second after last publish) (publish message as as we had a message at 5.60) 

Инфраструктура JAVA, у нас в распоряжении есть rabbitmq, appsyn c, aws сервировок.

На данный момент предлагаются следующие решения:

  1. Использовать блокирующий поток для приостановки после первого сообщения и игнорирования следующего сообщения. s в той же самой topi c с тайм-аутом для запуска окончательного сообщения после n секунд и завершения его работы.

  2. Сохранить временную метку сообщения для каждой topi c введите в кеш, игнорируйте новые сообщения, если ключ существует в кеше, запускайте кучу рабочих для обработки очереди и запускайте сообщения для сообщений старше n секунд и удаляйте сохраненный ключ из кеша.

  3. Запустите первое сообщение и сохраните метку времени в кеше для topi c и не запускайте никаких сообщений после тех пор, пока новое сообщение меньше n секунды старый (в этом случае мы потеряем последнее сообщение)

У каждого решения есть свои плюсы и минусы, я ищу еще несколько мозгов, чтобы узнать, какие есть другие варианты, может быть, система очередей может помочь?

Ответы [ 2 ]

1 голос
/ 09 января 2020

Этого можно достичь с помощью библиотек реактивного программирования, таких как Project Reactor .

. Вы можете получить Flux из тем и регулировать поток входящих сообщений, используя комбинацию filter(), skip() и buffer() операции. Вы можете найти более подробную информацию об указанных c доступных операциях здесь

1 голос
/ 09 января 2020

Добавить некоторый буфер для сообщений, вызвать фактическое значение pu sh для topi c из выделенного потока с фиксированными задержками между выполнениями:

public class Publisher implements Runnable {
int maxPendingMessages = 5000;
int maxBulkMessages = 10;

private final LinkedList<String> messages = new LinkedList<>();
private final Object lock = new Object();

public static void main(String[] args) {
    ScheduledExecutorService ses = Executors.newScheduledThreadPool(15);
    Publisher p = new Publisher();
    ses.scheduleAtFixedRate(p, 0, 1000, TimeUnit.MILLISECONDS);
    for (int i = 0; i < 5000; i++) {
        p.sendMessage("test " + i);
    }
}
public void sendMessage(String msg) {
    Assert.notNull(msg, "topic message should not be empty!");
    if (messages.size() > maxPendingMessages) {
        synchronized (lock) {
            messages.pollFirst();
        }
    }
    synchronized (lock) {
        messages.add(msg);
    }
}
public void run() {        
    int total = messages.size();
    if (total > maxBulkMessages) {
        total = maxBulkMessages;
    }
    int count = 0;
    while (count <= total) {
        if (Thread.interrupted()) {
            break;
        }
        count++;
        try {
            final String m;
            synchronized (lock) {
                m = messages.pollFirst();
            }
            if (m == null) {
                // don't try to send empty ( null ) message
                continue;
            }
            System.out.println("message was:" + m);
            // actual push to topic here
        } catch (Exception e) {
            // no need to process pending messages on delivery errors
            try {
                messages.clear();
            } catch (Exception ee) {
            }
        }
    }
} }
...