Параллельная обработка сообщений со строгим порядком - PullRequest
10 голосов
/ 20 марта 2012

В моем веб-приложении JavaEE мне нужно обрабатывать входящие сообщения строго в порядке поступления. Я предполагаю, что мой контейнер веб-приложений (Tomcat 6) сохраняет порядок сообщений по мере их поступления на порт http.

Что вызывает у меня головную боль, так это то, как я внутренне обрабатываю эти сообщения. Для улучшения рабочей нагрузки я добавляю обработку каждого сообщения в ThreadPool, так как здесь нужно сделать много вещей, например, Синтаксический анализ XML, иногда обогащение данных с использованием внешних веб-сервисов. После завершения обработки я помещаю Java-представление сообщения в сложный механизм обработки потоков esper.codehaus.org , который является потокобезопасным. Здесь проверяются различные шаблоны, когда порядок поступления является самым высоким требованием, например, превышение порога явления.

У меня была идея вставить каждое обработанное сообщение в PriorityQueue с идентификатором приоритета, который они получили во время прибытия (в моем сервлете, где оно увеличивается для каждого сообщения). Проблема заключается в следующем:

Поток, который опрашивает элементы из очереди (самый низкий идентификатор является главой очереди) для вставки его в esper, может пропустить идентификатор, так как он не проверяет отсутствующие элементы. Я думаю, что иллюстрация работает лучше:

enter image description here

Для шагов (1) - (4) все работает как задумано. Но на этапе (5) QueuePoller извлекает элемент 6, а не элемент 4 (который позже вставляется на этапе (6)). Это приводит к порядку сообщений: 2; 3; 6; 4.

То, что я пытался сделать, - это изменить реализацию опроса головы очереди, чтобы она следовала строгому порядку идентификаторов. Это означает, что если следующий элемент идентификатора еще не вставлен в очередь, подождите у барьера, пока он не появится. Казалось, это работало в течение первых 10 минут, но затем зависало, вероятно, из-за элемента, который никогда не вставлялся в очередь.

У кого-нибудь была подобная проблема в прошлом и есть какой-то намек на меня?

Ответы [ 4 ]

3 голосов
/ 20 марта 2012

Выезд Disruptor - высокопроизводительная очередь со строгим порядком (первый вход - первый обслуженный)

1 голос
/ 20 марта 2012

Вы можете мгновенно добавить заполнитель для входящих запросов в вашу очередь обработки. Заполнитель предварительно обрабатывается в фоновом режиме пулом потоков, но основная обработка ожидает завершения предварительной обработки. Я имею в виду конструкцию Future .

0 голосов
/ 20 марта 2012

Как свидетельствует ваша проблема и необходимость в диаграмме (кстати, +1 за это), очередь с приоритетами не является хорошей конструкцией для того, что вы хотите. Это потому, что очередь очень рада обслуживать вас доступными 6, а не ждать недоступных 4.

Я думаю, пришло время свернуть свой собственный синхронизированный контейнер.

0 голосов
/ 20 марта 2012

Библиотека классов обеспечивает гибкую реализацию пула потоков вместе с некоторыми полезными предопределенными конфигурациями.Вы можете создать пул потоков, вызвав один из статических методов фабрики в Executors:

Для ваших нужд я считаю Executors.newSingleThreadExecutor () лучшим.Однопоточный исполнитель создает единый рабочий поток для обработки задач, заменяя его, если он неожиданно умирает.Задачи гарантированно обрабатываются последовательно в соответствии с порядком, установленным очередью задач (FIFO, LIFO, приоритетный порядок).

...