Как я могу создать оператор Esper и настроить подписчика атомарно, чтобы я не пропустил ни одного обновления? - PullRequest
0 голосов
/ 28 января 2020

У меня есть многопоточная программа, выполняющая CEP с использованием Esper. Пока события уже отправляются через движок, я создаю новую инструкцию и устанавливаю подписчика. Поскольку метод createEPL автоматически запускает оператор, возможно, я пропущу обновление, инициируемое оператором, между тем, когда оператор запущен и когда мой подписчик установлен. Рассмотрим пример:

EPStatement myStatement = epService.getEPAdministrator().createEPL(myStatementString);
// another thread sends an event that triggers my statement which at this moment has no subscribers
myStatement.setSubscriber(MySubscriber.this);
// now my subscriber will get updates but it potentially missed some since the statement was created

Я мог бы создать свои собственные блокировки для решения этой проблемы, но мне было интересно, есть ли альтернативный подход для одновременного создания оператора и установки подписчика, предоставляемого API Esper? Или, альтернативно, это способ создания EPStatement без его автоматического запуска, чтобы я мог назначить своего подписчика, а затем запустить оператор вручную?

1 Ответ

0 голосов
/ 28 января 2020

Вам необходимо получить блокировку и выполнить несколько шагов, связанных с развертыванием внутри блокировки. В документации atomi c развертывание

runtime.getRuntimeInstanceWideLock().writeLock().lock();
// Start atomic management unit. 
// Any events concurrently being processed by other threads must complete before the code completes obtaining the lock. 
// Any events sent in by other threads will await the release of the lock.
try {
  // Perform operations such as : 
  //   - deploy and/or undeploy multiple compiled modules  (deployment admin API)
  //   - set statement listeners and subscribers while deploying
  // There is no need to obtain this lock when deploying or undeploying a single module.
  // The lock is reentrant and can be safely taken multiple times by the same thread.
  // Make sure you use "try" and "finally" just like we have it here.
}
finally {
  // Complete atomic management unit. 
  // Any events sent in by other threads will now continue processing against the changed set of statements.
  runtime.getRuntimeInstanceWideLock().writeLock().unlock();
}
...