Синхронизировать один конкретный вызов веб-сервиса - PullRequest
0 голосов
/ 24 января 2011

Мне интересно, каким будет лучший подход к этой проблеме.У меня есть (абстрактно говоря) простой метод, который вызывает веб-сервис и сохраняет результат в локальном кеше в памяти, что-то вроде:

public Document callWebservice(SomeObject parameter) { 
    Document result = cache.get(parameter);
    if (result == null) {
        result = parse(retrieve(parameter));
        cache.put(result);
    }
    return result;
}

Теперь, если документ находится в кеше, он может простовернуть без проблем, хорошо.В однопоточной среде этот подход тоже работает отлично.Однако в многопоточной среде выясняется, что каждый поток будет обращаться к биту 'else' и вызывать веб-сервис несколько раз.

Я мог бы перебросить синхронизированный блок в части 'else', но яПоверьте, это слишком «широкий» размер блокировки - весь метод будет недоступен для вызова потоков, даже если они вызывают совершенно разные вещи.

Хорошо, если веб-служба вызывается дважды, если запрос другой (т. е. параметр SomeObject, в данном случае).

Теперь вопрос: какой подход лучше использовать в этом случае?

Я думал о сохранении параметра в коллекции (threadsafe)объект.Если содержимое параметра одинаково, он выдаст тот же результат hashCode / equals и будет найден в объекте коллекции, указывая, что другой поток уже обрабатывает этот запрос.Если это так, вызывающий поток может быть приостановлен, пока не вернется веб-служба.(Я должен был бы выяснить, как заставить вызывающий поток ждать все же).Будет ли это работать с блокировкой объекта параметра SomeObject?например:

private Map<SomeObject, SomeObject> currentlyProcessingItems = new ConcurrentHashMap<SomeObject, SomeObject>();
public Document callWebservice(SomeObject parameter) {
    if (currentlyProcessedItems.contains(parameter)) {
        parameter = currentlyProcessedItems.get(parameter);
    } else {
        currentlyProcessedItems.putIfAbsent(parameter);
    }
    synchronized(parameter) {
        Document result = cache.get(parameter);
        if (result == null) {
            Document result = parse(retrieve(parameter));
            cache.put(result);
        }
        currentlyProcessedItems.remove(parameter);
        return result;
    }
}

(примечание: логика для отслеживания текущих обрабатываемых запросов, использования ConcurrentHashMap и блокировки может быть неоптимальной или совершенно неправильной)

Нет, я фактически никогда не заканчивал читатькнига по резьбе.Я должен.

Я уверен, что эта конкретная проблема встречается довольно часто, я просто не смог найти ответ.Как называется такая ситуация (т. Е. Блокировка определенного объекта), если можно спросить?

Ответы [ 4 ]

1 голос
/ 24 января 2011

Внимание! Объекты xml DOM не поточно-ориентированы.поэтому (игнорируя проблемы безопасности потоков самого кэша) вы не можете совместно использовать экземпляр Document среди нескольких потоков.мы немного разбираемся в нашей собственной кодовой базе, так что я точно знаю, что это проблематично (не просто теоретическая проблема).

Что касается самого кэширования, я считаю, что у guava есть то, что вы ищете: вычислительная карта

0 голосов
/ 25 января 2011

Я попробовал свою собственную идею, и на самом деле она, кажется, работает нормально - во-первых, она использует ConcurrentHashMap (хотя хороший параллельный набор был бы лучше, пока не нашел правильной реализации) для вести список объектов, обрабатываемых в данный момент.

В самом методе он ищет, если карта уже содержит экземпляр идентификатора. Если это не так, он пытается вставить идентификатор, используя putIfAbsent(). Если между проверкой и вставкой другой поток вставляет ее, то метод putIfAbsent возвращает вставленный элемент другого потока, и этот элемент используется.

Затем, либо с новым идентификатором, либо с существующим, запускается синхронизированный блок, блокирующий экземпляр идентификатора. В синхронизированном блоке используется кэш, и если это не возвращает результаты, вызывается веб-служба. По сути, это означает, что все процессы, выполняющие один и тот же запрос (идентифицируемый идентификатором), могут обрабатывать только один из этих запросов одновременно, поэтому, когда первый вызывает веб-сервис, остальные ожидают в очереди. Когда наступает их очередь, результат находится в кэше, и ожидающие потоки могут получить его оттуда.

Что касается объекта Document, который не является потокобезопасным, я пока не уверен, что с этим делать. На моей локальной машине это, похоже, не вызывает никаких проблем, но подобные вещи имеют обыкновение появляться в производстве наугад. Grr. Думаю, придется разобраться с этим.

0 голосов
/ 24 января 2011

Я сам думал о проблеме, особенно когда получение (параметр) занимает много времени (для меня это подключение к серверу для проверки подлинности, запрос процесса / фильтра на сервере и т. Д.). Сейчас я еще не пробовал это сам, но ради обсуждения, как это звучит?

Чтобы использовать кеш, необходимо вызвать новый MyCache (ключ) .getValue () из потока, поскольку getValue будет заблокирован в методе getCacheValue (), пока это значение не станет доступным (для всех ожидающих потоков).

public class MyCache {

    private final static HashMap cacheMap = new HashMap();  // One Map for all
    private final static Vector fetchList = new Vector();   // One List for all

    private Object cacheValue;
    private boolean waitingState;

    public MyCache (Object key) {
        if (cacheMap.containsKey (key)) {       // somebody has done it
            cacheValue = cacheMap.get (key);
        } else {
            waitingState = true;
            if (fetchInProgress (key, false))   // someone else is doing it
                return;
            new Thread (new MyFetch (key)).start();
    }}

    synchronized private static boolean fetchInProgress (Object key, boolean remove) {
        if (remove) {
            fetchList.remove (key);
        } else {
            boolean fetchingNow = fetchList.contains (key);
            if (fetchingNow)
                return true;
            fetchList.addElement (key);
        }
        return false;
    }

    public Object getValue () {
        if (waitingState)
            getCacheValue (true);
        return cacheValue;
    }

    synchronized private void getCacheValue (boolean waitOnLock) {
        if (waitOnLock) {
            while (waitingState) {
                try {
                    wait();
                } catch (InterruptedException intex) {
        }}} else {
            waitingState = false;
            notifyAll();
    }}

    public class MyFetch implements Runnable {
        private Object fetchKey;
        public MyFetch (Object key) {
            fetchKey = key;     // run() needs to know what to fetch
        }

        public void run () {        // Grab the resource, handle exception, etc.
            Object fetchedValue = null; 
            // so it compiles, need to replace by callWebService (fetchKey);
            cacheMap.put (fetchKey, fetchedValue);  // Save for future use
            fetchInProgress (fetchKey, true);   // Remove from list
            getCacheValue (false);          // Notify waiting threads
}}}
0 голосов
/ 24 января 2011

Java 5 и выше имеют классы, которые помогают делать именно это. Вам может понравиться ConcurrentHashMap: http://www.javamex.com/tutorials/synchronization_concurrency_8_hashmap.shtml

...