Как разработать сервис, который обрабатывает сообщения, поступающие в очередь - PullRequest
4 голосов
/ 20 июля 2009

У меня вопрос по дизайну для многопоточной службы Windows, которая обрабатывает сообщения от нескольких клиентов. Правила

  • Каждое сообщение должно обрабатывать что-либо для объекта (с уникальным идентификатором) и может быть различным, т. Е. DoA, DoB, DoC и т. Д. Идентификатор объекта находится в полезной нагрузке сообщения.
  • Обработка может занять некоторое время (до нескольких секунд).
  • Сообщения должны обрабатываться в порядке поступления для каждой сущности (с одинаковым идентификатором).
  • Однако сообщения могут обрабатываться для другого объекта одновременно (т. Е. Если они не совпадают с идентификатором объекта)
  • Количество параллельных обработок настраивается (обычно 8)
  • Сообщения не могут быть потеряны. Если при обработке сообщения возникает ошибка, то это сообщение и все другие сообщения для того же объекта должны быть сохранены для дальнейшей обработки вручную.
  • Сообщения поступают в транзакционную очередь MSMQ.

Как бы вы разработали услугу. У меня есть рабочее решение, но я хотел бы знать, как другие будут решать это.

Ответы [ 4 ]

1 голос
/ 29 марта 2010

Первое, что вам нужно сделать, это сделать шаг назад и подумать, насколько критична производительность для этого приложения. Вам действительно нужно обрабатывать сообщения одновременно? Это важно для миссии? Или ты просто думаешь , что тебе это нужно? Вы использовали профилировщик на своем сервисе, чтобы найти реальные узкие места процессов и оптимизировать их?

Причина, по которой я спрашиваю, заключается в том, что вы упомянули, что хотите 8 одновременных процессов - однако, если вы сделаете это приложение однопоточным, это значительно уменьшит сложность, время разработки и тестирования ... И так как вы хотите только 8, это почти не стоит ...

Во-вторых, поскольку вы можете обрабатывать только параллельные сообщения на одном и том же объекте - как часто вы действительно будете получать параллельные запросы от вашего клиента на получение одного и того же объекта ? Стоит ли добавлять так много уровней сложности для варианта использования, который может появляться не очень часто?

Я бы поцеловал. Я бы использовал MSMQ через WCF и сохранил бы мой сервис WCF как одиночный. Теперь у вас есть мощность, заказанная надежность MSMQ, и вы теперь соответствуете вашим реальным требованиям. Затем я проверил бы его при высокой нагрузке с реалистичными данными и запустил профилировщик, чтобы найти узкие места , если я обнаружил, что это было слишком медленно. Только тогда я справлюсь со всеми дополнительными трудностями создания гораздо более сложного приложения для управления параллелизмом только для конкретных случаев использования ...

Один из подходов, который следует рассмотреть, - это создание центральной службы «сторожевого устройства» или «служебной шины», которая получает все сообщения от клиентов и затем передает эти сообщения фактическим рабочим службам. Когда он получает запрос, он затем обнаруживает, обрабатывает ли другой из его клиентов уже сообщение для того же объекта - если это так, он отправляет его той же службе, которой отправил другое сообщение. Таким образом, вы можете одновременно обрабатывать одни и те же сообщения для данной сущности и ничего более ... И у вас есть простота плавной масштабируемости ... Однако я бы сделал это, только если бы мне это было абсолютно необходимо, и это было доказано с помощью профилирования и тестирования и не потому, что «мы думаем, что нам это нужно» (см. руководителя YAGNI:))

0 голосов
/ 11 февраля 2010

Я бы посмотрел на наличие n потоков, каждый из которых считывал бы из одной потокобезопасной очереди. Затем я бы хэшировал EntityId, чтобы решить, в какую очередь ставить входящее сообщение.

Иногда некоторым потокам нечего делать, но разве это проблема, если у вас больше потоков, чем у процессоров?

(Также вы можете пожелать сгруппировать объекты по типу в очереди, чтобы уменьшить количество конфликтов блокировки в вашей базе данных.)

0 голосов
/ 21 февраля 2010

Мой подход будет следующим:

  1. Создайте пул потоков с вашим настраиваемым количеством потоков.
  2. Хранить карту идентификаторов сущностей и связывать каждый идентификатор с очередью сообщений.
  3. Получив сообщение, поместите его в очередь соответствующего идентификатора объекта.
  4. Каждый поток будет смотреть только на выделенный ему идентификатор сущности (например, создать класс, который инициализирован как такая служба (идентификатор EntityID)).
  5. Пусть поток обрабатывает только сообщения из очереди своего выделенного идентификатора сущности.
  6. Как только все сообщения обработаны для данного идентификатора объекта, удалите идентификатор из карты и выйдите из цикла потока.
  7. Если в пуле потоков есть место, добавьте новый поток для обработки следующего доступного идентификатора сущности.

Вам придется управлять сообщениями, которые не могут быть обработаны в данный момент, включая ситуации, когда обработка сообщений не выполняется. Создать журнал сообщений и т. Д.

Если у вас есть доступ к параллельной карте (карта без блокировки / без ожидания), то вы можете иметь несколько считывателей и писателей на карту без необходимости блокировки или ожидания. Если вы не можете получить параллельную карту, тогда все непредвиденные обстоятельства будут на карте: всякий раз, когда вы добавляете сообщения в очередь на карте или добавляете новые идентификаторы сущностей, вы должны заблокировать ее. Лучше всего обернуть карту в структуру, которая предлагает методы для чтения и записи с соответствующей блокировкой.

Не думаю, что вы заметите какое-либо существенное влияние на производительность от блокировки, но если вы начнете ее видеть, я бы посоветовал вам создать собственную хэш-карту без блокировки: http://www.azulsystems.com/events/javaone_2007/2007_LockFreeHash.pdf

Внедрение этой системы не будет рудиментарной задачей, поэтому возьмите мои комментарии в качестве общего руководства ... инженер должен реализовать идеи, которые применимы.

0 голосов
/ 30 августа 2009

Хотя мои требования отличались от ваших, мне приходилось иметь дело с параллельной обработкой из очереди сообщений. Мое решение состояло в том, чтобы иметь службу, которая будет проверять каждое входящее сообщение и передавать его агентскому процессу для использования. У службы есть настройка, которая контролирует, сколько агентов она может запустить.

...