Как заставить поток / задание подождать, пока предыдущее задание не будет завершено, и затем возобновить его при некотором уведомлении - PullRequest
0 голосов
/ 07 ноября 2019

У меня есть верблюжий маршрут, который выполнит обновление для данного идентификатора.

Для одного запроса все работает нормально, скажем Если у меня несколько запросов с одним и тем же идентификатором, скажем 10, то я хочуодин запрос выполняется за один раз .

или заставляет ждать другой запрос с тем же идентификатором, в то время как у меня есть запрос с другим идентификатором, который можно обработать без ожидания, скажем, 12.

Дляэто я создал дополнительный маршрут в моем контексте верблюда "addRequestToPool", который направит весь запрос, приходящий к верблюду "updateTicket", в 1 место, и оттуда должна быть выполнена обработка 1 на 1.

Iхотите реализовать что-то вроде ниже :

Когда запрос приходит к методу «synchronizeRequests», он должен проверить, существует ли уже существующее задание с docketId, если есть make, дождаться завершения старого процесса (скажем,10). если такого идентификатора нет (скажем, 12), продолжайте обработку.

Как только старый процесс завершен, он должен сообщить другим работам с тем же идентификатором, ожидающим, что 1 из них может возобновить там работу с того места, где они ожидают, так что ответ должен быть отправлен.

Сбор запросов выполнениз маршрута "addRequestToPool", как только приходит запрос, он создает поток для каждого запроса и вызывает метод синхронизации. Внутри метода синхронизации «RequestSynchronization» я хочу выполнить операцию проверки для существующего задания с идентификатором, если оно существует, дождаться уведомления от предыдущего, если не получен ответ, и продолжить обработку.

После завершения задания с использованием маршрута »removeRequestFromPool "текущий идентификатор задания удаляется, и о новом задании в группе (тот же идентификатор) следует уведомлять о продолжении.

Я хотел бы знать, как реализовать описанный выше сценарий или любой другой подходящий подход, еслиany?

Ниже приведен пример кода для справки,

Верблюжий маршрут:

from("direct:updateTicket")
    .to("direct:addRequestToPool")
    .to("direct:getDetailForTicket")
    .to("direct:updateDetailForTroubleTicket")
    .to("direct:getDetailForTicket")
    .to("direct:removeRequestFromPool")
    .to("direct:endRoute");

Вызов API:

public class TicketRequestManagement {

    public SimpleActionResponse addRequestToPool(int docketNo) throws InterruptedException {

        SimpleActionResponse response = new SimpleActionResponse();

        RequestSynchronization requestSynchronization = new RequestSynchronization(docketNo);
        Thread thread = new Thread(requestSynchronization);
        thread.start();
        thread.join();
        return response;
    }
}

Метод синхронизации:

public class RequestSynchronization implements Runnable {

    private static Logger logger = Logger.getLogger(RequestSynchronization.class);

    private int docketNo;

    public RequestSynchronization(int docketNo) {
        super();
        this.docketNo = docketNo;
    }

    @Override
    public void run() {
        synchronizeRequests(docketNo);
    }

    public synchronized static void synchronizeRequests(int docketNo) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Я думал о добавлении docketId в статическую структуру данных (Set) и проверке, существует ли Id wait (Как мне выполнить это ожидание?), Как только процесс завершится, удалите ID из set и уведомитеожидание возобновления оттуда (как мне выполнить это уведомление с тем же идентификатором?).

Дайте мне знать, еслинужна какая-то ясность.

PS: я не ищу сна, это просто для выполнения теста синхронизации.

Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 11 ноября 2019

Мое предложение: идти асинхронно . Например, используйте JMS (не напрямую, а с отличной поддержкой JMS Camels ) для разделения запросов и их обработки . Таким образом, вы можете контролировать весь поток данных просто с помощью счетчика.

В вашем случае это будет

  • addRequestToPool помещает запрос в очередь JMS
  • потребитель JMS получает запросы из очереди и обрабатывает их
  • Использование групп сообщений для синхронизации запросов с тем же идентификатором
  • Использование счетчик потребления , чтобы убедиться, что другие идентификаторы обрабатываются параллельно и масштабироваться (вы пишете 0 строк кода для многопоточности)
  • Клиенты могут отправлять запросы на обновление, даже если компонент (ы) обработкинедоступны для обслуживания

Группы сообщений являются функцией JMS-брокера (по крайней мере, ActiveMQ). Вы устанавливаете свой идентификатор как JMSXGroupID, и брокер следит за тем, чтобы все сообщения с одинаковым идентификатором обрабатывались одним и тем же потребителем . Если у вас есть 5 потребителей и вы получаете 10 сообщений с одинаковым идентификатором, всю работу выполняет 1 потребитель. Как только приходит сообщение с другим идентификатором, другой потребитель обрабатывает его параллельно.

Вам решать, если вы создадите один большой компонент JMS, который выполняет всю обработку, или если вы создадите несколько компонентов, которые передают сообщение через несколько очередей в рамках вашего рабочего процесса обработки.

0 голосов
/ 07 ноября 2019

Прежде всего, не запускайте Thread вручную, для этого ExecutorService s.

Во-вторых, я бы посоветовал вам поддерживать запущенные синхронизации в Map<Integer, Future<>>, чтобы выможно проверить, что что-то уже запущено для этого идентификатора.

Черновой вариант будет

class TicketRequestService {
    private Map<Integer, Future<?>> runningRequests = new ConcurrentHashMap<>();
    ExecutorService executor = Executors.newCachedThreadPool();

    public void synchronizeRequest(int docketId) {
        // get the running Future if present, otherwise...
        Future<?> future = runningRequests.computeIfAbsent(docketId,
            // ...create a new one
            id-> executor.submit(() -> {
                RequestSynchronization.synchronizeRequests(id));
                // remove future from map after sync is finished
                runningRequests.remove(docketId);
            });
        future.get(); // wait until finished; this throws Exceptions, you need to handle them
        return new SimpleActionResponse();
   }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...