Инфраструктура Java-сервера для прослушивания операторов PostgreSQL NOTIFY - PullRequest
1 голос
/ 26 декабря 2011

Мне нужно написать сервер, который прослушивает операторы PostgreSQL NOTIFY и рассматривает каждое уведомление как запрос на обслуживание (фактически, больше похоже на задачу для обработки). Мои основные требования:

1) Механизм опроса на PGConnection (В идеале это будет прослушиватель, но в реализации PgJDBC мы должны опрашивать ожидающие уведомления. Ссылка )

2) Выполнить обратный вызов на основе «запроса» (используя имя канала в уведомлении NOTIFY) в отдельном потоке.

3) Встроенные средства управления потоками (создавать / удалять потоки, когда задача обрабатывается / завершается, ставить в очередь, если одновременно выполняется слишком много задач и т. Д.)

Требования 1 и 2 - это то, что мне легко реализовать самостоятельно. Но я бы предпочел не писать управление потоками самостоятельно.

Существует ли существующая структура, отвечающая этим требованиям? Дополнительным преимуществом было бы, если среда автоматически генерирует статистику запросов.

1 Ответ

1 голос
/ 27 декабря 2011

Честно говоря, требование 3, вероятно, можно легко выполнить, просто используя стандартные реализации ExecutorService от Executors, которые позволят вам, например, получить пул потоков фиксированного размера и передать им работу в форме Runnable или Callable.реализации.Они будут иметь дело с мрачными деталями создания потоков до предела и т. Д. Затем вы можете попросить слушателя внедрить тонкий слой Runnable для сбора статистики и т. Д.

Что-то вроде:

private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
private final NotificationCallback callback;
private int waiting, executing, succeeded, failed;

public void pollAndDispatch() {
   Notification notification;
   while ((notification = pollDatabase()) != null) {
      final Notification ourNotification = notification;
      incrementWaitingCount();
      threadPool.submit(new Runnable() {
         public void run() {
           waitingToExecuting();
           try {
             callback.processNotification(ourNotification);
             executionCompleted();
           } catch (Exception e) {
             executionFailed();
             LOG.error("Exeception thrown while processing notification: " + ourNotification, e);
           }
         }
      });
   }
}
// check PGconn for notification and return it, or null if none received
protected Notification pollDatabase() { ... }
// maintain statistics
private synchronized void incrementWaitingCount() { ++waiting; }
private synchronized void waitingToExecuting() { --waiting; ++executing; }
private synchronized void executionCompleted() { --executing; ++succeeded; }
private synchronized void executionFailed() { --executing; ++failed; }

Если вы хотите быть модным, поместите уведомления в очередь JMS и используйте его инфраструктуру для прослушивания новых элементов и их обработки.

...