Java асинхронная обработка - PullRequest
       18

Java асинхронная обработка

2 голосов
/ 03 февраля 2012

В настоящее время я занимаюсь разработкой системы, которая использует асинхронную обработку. Передача информации осуществляется с использованием очередей. Таким образом, один процесс поместит информацию в очередь (и прекратит работу), а другой получит ее и обработает. Моя реализация ставит меня перед рядом проблем, и меня интересует, какой подход у всех есть к этим проблемам (с точки зрения архитектуры, а также библиотек).

Позвольте мне нарисовать картину. Допустим, у вас есть три процесса:

Process A -----> Process B
                      |
Process C <-----------|

Итак Процесс A помещает сообщение в очередь и завершается, Процесс B берет сообщение, обрабатывает его и помещает в очередь «возврата». Процесс C принимает сообщение и обрабатывает его.

  1. Как обрабатывать Процесс B , не прослушивающий или не обрабатывающий сообщения вне очереди? Существует ли какой-либо метод типа JMS, который запрещает источнику отправлять сообщение, когда потребитель не активен? Поэтому Процесс A отправит, но сгенерирует исключение.
  2. Допустим, Процесс C должен получить ответ в течение X минут, но Процесс B остановлен (по любой причине), существует ли какой-либо механизм, обеспечивающий тайм-аут для Очередь? Таким образом, гарантированный ответ в течение X минут, который будет запускать Процесс C .

Можно ли решить все эти вопросы с помощью какой-нибудь очереди недоставленных писем? Должен ли я делать все это вручную с помощью таймеров и проверки. Я упомянул JMS, но я открыт для всего, на самом деле я использую Hazelcast для очередей.

Обратите внимание, что это скорее архитектурный вопрос с точки зрения доступных технологий и методов Java, и я чувствую, что это правильный вопрос.

Любые предложения будут с благодарностью.

Спасибо

Ответы [ 6 ]

2 голосов
/ 03 февраля 2012

ИМХО, самое простое решение - использовать ExecutorService или решение, основанное на службе исполнителя. Это поддерживает очередь работы, запланированные задачи (для тайм-аутов).

Может также работать в одном процессе. (Я считаю, что Hazelcast поддерживает распределенный ExecutorService)

2 голосов
/ 03 февраля 2012

Я думаю, что на ваш первый вопрос уже ответили адекватно другие авторы.

На ваш второй вопрос то, что вы пытаетесь сделать, может быть возможным в зависимости от механизма обмена сообщениями, используемого вашим приложением.Я знаю, что это работает с IBM MQ.Я видел, как это делается с использованием WebSphere MQ Classes для Java , но не JMS.Это работает так: когда Процесс A помещает сообщение в очередь, он указывает время ожидания ответного сообщения.Если процессу A не удается получить ответное сообщение в течение указанного времени, система выдает соответствующее исключение.

Я не думаю, что в JMS есть стандартный способ обработки тайм-аутов запросов / ответов так, как вам нужно, поэтому вам, возможно, придется использовать классы, специфичные для платформы, такие как WebSphere MQ Classes для Java.

2 голосов
/ 03 февраля 2012

Мне кажется, что тип вопросов, которые вы задаете, "пахнет" тем, что очереди и асинхронная обработка не могут быть лучшими инструментами в вашей ситуации.

1) Это наносит ущерб цели очереди,Похоже, вам нужен синхронный процесс запрос-ответ.

2) Процесс C вообще не получает ответа.Он получает сообщение из очереди.Если в очереди есть сообщение и Процесс C готов, он его получит.Процесс C может решить, что сообщение устарело, как только оно его получит, например.

1 голос
/ 03 февраля 2012

Я использовал аналогичный подход для создания системы очередей и обработки для задач транскодирования видео.В основном это работало так:

  1. Process A отправляет сообщение «расписание» на Arbiter Q, которое добавляет задание в свою очередь «ожидания».
  2. Process Bзапрашивает следующее задание из Arbiter Q, что удаляет следующий элемент из его очереди «ожидания» (с учетом некоторой пользовательской логики планирования, чтобы гарантировать, что один пользователь не может заполнить запросы на перекодировку, и запретить другим пользователям возможность перекодировать видео)и вставляет его в набор «обработки» перед возвратом задания обратно к Process B.Задание имеет метку времени, когда оно входит в набор «обработка».
  3. Process B завершает работу и отправляет «завершенное» сообщение на Arbiter Q, которое удаляет задание из набора «обработка», а затемизменяет некоторое состояние так, чтобы Process C знал, что задание выполнено.
  4. Arbiter Q периодически проверяет задания в своем наборе "обработки" и истекает время ожидания для всех, которые выполнялись в течение необычно длительного периода времени.Process A может свободно попытаться снова поставить в очередь то же самое задание, если оно того захочет.

Это было реализовано с использованием JMX (JMS был бы гораздо более уместным, но я отступил).Process A был просто потоком сервлета, который отвечал на инициированный пользователем запрос транскодирования.Arbiter Q был синглтоном MBean (сохраненным / реплицированным по всем узлам в кластере серверов), который получил сообщения «расписание» и «завершено».Его «очереди» с внутренним управлением были просто List экземплярами, и когда задание завершалось, оно изменяло значение в базе данных приложения для ссылки на URL транскодированного видеофайла.Process B был поток транскодирования.Его работа состояла в том, чтобы просто запросить работу, перекодировать ее, а затем отчитаться, когда она закончится.Снова и снова до конца времени.Process C был другой поток пользователя / сервлета.Он увидит, что URL-адрес доступен, и предоставит пользователю ссылку на скачивание.

В таком случае, если Process B умрет, то задания будут всегда находиться в очереди "ожидания".На практике, однако, этого никогда не было.Если ваш Process B не работает / не выполняет то, что должен делать, то я думаю, что это указывает на проблему в вашем развертывании / конфигурации / реализации Process B больше, чем на ваш общий подход.

1 голос
/ 03 февраля 2012

Вы ожидаете семантику синхронной обработки с асинхронной (обмен сообщениями) настройкой, которая невозможна.Я работал над WebSphere MQ и обычно, когда потребитель умирает, сообщения остаются в очереди навсегда (если вы не установите срок действия).Как только очередь достигает своей глубины, последующие сообщения перемещаются в очередь недоставленных сообщений.

1 голос
/ 03 февраля 2012

Ну, суть очередей в том, чтобы держать вещи довольно изолированными.

Если вы не застряли в какой-либо конкретной технологии, вы можете использовать базу данных для своих очередей.

Но сначала простой механизм, обеспечивающий координацию двух процессов, - это использование сокета.Если это целесообразно, просто сделайте так, чтобы процесс B создал прослушиватель с открытым сокетом на каком-то хорошо известном порту, и процесс A подключится к этому сокету и будет следить за ним.Если процесс B когда-либо уходит, процесс A может сказать, потому что его сокет отключается, и он может использовать это как предупреждение о проблемах с процессом B.

Для проблемы B -> C, иметь таблицу db:

create table queue (
    id integer,
    payload varchar(100), // or whatever you can use to indicate a payload
    status varchar(1),
    updated timestamp
)

Затем Процесс A помещает свою запись в очередь с текущим временем и состоянием «B».B прослушивает очередь:

select * from queue where status = 'B' order by updated

Когда B завершается, он обновляет очередь, чтобы установить состояние на "C".

Между тем, "C" опрашивает DB с:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(с таким пороговым значением, как долго вы хотите, чтобы вещи гнили в очереди).

Наконец, C обновляет строку очереди до 'D' для выполненного или удаляет егоили что угодно.

Темная сторона в том, что здесь есть некоторое состояние гонки, когда C может попытаться захватить вход, пока B только запускается.Вы можете, вероятно, пройти через это со строгим уровнем изоляции и некоторой блокировкой.Что-то простое:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE

Также используйте FOR UPDATE для выбора B.Таким образом, тот, кто победит в выбранной гонке, получит эксклюзивную блокировку на ряду.

Это продвинет вас довольно далеко в плане реальной функциональности.

...