Многопоточный кеш (не для графики) в Java? - PullRequest
0 голосов
/ 14 сентября 2009

Недавно я искал способ реализовать двунаправленный буферизированный потокобезопасный кеш для обычных объектов.

Необходимость возникла из-за того, что у нас было несколько кэшированных структур данных, которые неоднократно обрабатывались для каждого запроса, и их необходимо было перезагружать из кэша из очень большого документа (1 с + время демаршаллинга), и мы не могли позволить, чтобы все запросы были задерживается так долго каждую минуту.

Так как я не смог найти хорошую реализацию с защитой потоков, я написал свою собственную, и теперь мне интересно, если она правильная и можно ли ее уменьшить ... Вот она:

package nl.trimpe.michiel

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Abstract class implementing a double buffered cache for a single object.
 * 
 * Implementing classes can load the object to be cached by implementing the
 * {@link #retrieve()} method.
 * 
 * @param <T>
 *            The type of the object to be cached.
 */
public abstract class DoublyBufferedCache<T> {

    private static final Log log = LogFactory.getLog(DoublyBufferedCache.class);

    private Long timeToLive;

    private long lastRetrieval;

    private T cachedObject;

    private Object lock = new Object();

    private volatile Boolean isLoading = false;

    public T getCachedObject() {
        checkForReload();
        return cachedObject;
    }

    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
            if (!isReloading()) {
                synchronized (lock) {
                    // Recheck expiration because another thread might have
                    // refreshed the cache before we were allowed into the
                    // synchronized block.
                    if (isExpired()) {
                        isLoading = true;
                        try {
                            cachedObject = retrieve();
                            lastRetrieval = System.currentTimeMillis();
                        } catch (Exception e) {
                            log.error("Exception occurred retrieving cached object", e);
                        } finally {
                            isLoading = false;
                        }
                    }
                }
            }
        }
    }

    protected abstract T retrieve() throws Exception;

    private boolean isExpired() {
        return (timeToLive > 0) ? ((System.currentTimeMillis() - lastRetrieval) > (timeToLive * 1000)) : true;
    }

    private boolean isReloading() {
        return cachedObject != null && isLoading;
    }

    public void setTimeToLive(Long timeToLive) {
        this.timeToLive = timeToLive;
    }

}

Ответы [ 4 ]

3 голосов
/ 14 сентября 2009

То, что вы написали, не является потокобезопасным. На самом деле, вы наткнулись на распространенную ошибку, которая является довольно известной проблемой. Это называется проблема двойной проверки блокировки , и многие такие решения, как ваше (и есть несколько вариантов этой темы), имеют проблемы.

Существует несколько возможных решений, но им проще всего просто использовать ScheduledThreadExecutorService и перезагружать то, что вам нужно, каждую минуту или так часто, как вам нужно. Когда вы перезагружаете его, он помещает его в кеш-результат, и вызовы для него просто возвращают последнюю версию. Это потокобезопасно и легко реализуемо. Конечно, он загружается не по требованию, но, кроме начального значения, вы никогда не будете терять производительность при получении значения. Я бы назвал это чрезмерной загрузкой, а не отложенной загрузкой.

Например:

public class Cache<T> {
  private final ScheduledExecutorsService executor =
    Executors.newSingleThreadExecutorService();
  private final Callable<T> method;
  private final Runnable refresh;
  private Future<T> result;
  private final long ttl;

  public Cache(Callable<T> method, long ttl) {
    if (method == null) {
      throw new NullPointerException("method cannot be null");
    }
    if (ttl <= 0) {
      throw new IllegalArgumentException("ttl must be positive");
    }
    this.method = method;
    this.ttl = ttl;

    // initial hits may result in a delay until we've loaded
    // the result once, after which there will never be another
    // delay because we will only refresh with complete results
    result = executor.submit(method);

    // schedule the refresh process
    refresh = new Runnable() {
      public void run() {
        Future<T> future = executor.submit(method);
        future.get();
        result = future;
        executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
      }
    }
    executor.schedule(refresh, ttl, TimeUnit.MILLISECONDS);
  }

  public T getResult() {
    return result.get();
  }
}

Это требует небольшого объяснения. По сути, вы создаете универсальный интерфейс для кэширования результата Callable, который будет загружать ваш документ. Отправка Callable (или Runnable) возвращает будущее. Вызов блоков Future.get () до тех пор, пока он не вернется (не завершится).

Итак, что это делает, так это реализует метод get () в терминах Future, чтобы начальные запросы не заканчивались ошибкой (они будут блокироваться). После этого каждые миллисекунды 'ttl' вызывается метод обновления. Он отправляет метод в планировщик и вызывает Future.get (), который возвращает результат и ожидает его завершения. После завершения он заменяет элемент «result». Подпоследовательность вызовов Cache.get () вернет новое значение.

В ScheduledExecutorService есть метод scheduleWithFixedRate (), но я избегаю его, потому что, если Callable занимает больше времени, чем запланированная задержка, у вас будет многократный запуск в одно и то же время, а затем придется беспокоиться об этом или регулировать. Процессу проще представить себя в конце обновления.

0 голосов
/ 14 сентября 2009

Вы пытаетесь заблокировать больше, чем требуется, в вашем хорошем случае (полный и действительный кэш) каждый запрос получает блокировку. вы можете обойтись только блокировкой, если срок действия кэша истек.

Если мы перезагружаемся, ничего не делаем.
Если мы не перезагружаемся, проверьте, не истек ли срок, если не истек, продолжайте. Если мы не перезагружаемся и у нас истек срок действия, истек срок действия блокировки и двойной проверки, чтобы убедиться, что мы не загрузили успешно после последней проверки.

Также обратите внимание, что вы можете перезагрузить кэш в фоновом потоке, поэтому не требуется, чтобы один запрос был heldup, ожидающим заполнения кеша.


    private void checkForReload() {
        if (cachedObject == null || isExpired()) {
                if (!isReloading()) {

                       // Recheck expiration because another thread might have
                       // refreshed the cache before we were allowed into the
                        // synchronized block.
                        if (isExpired()) {
                                synchronized (lock) {
                                        if (isExpired()) {
                                                isLoading = true;
                                                try {
                                                        cachedObject = retrieve();
                                                        lastRetrieval = System.currentTimeMillis();
                                                } catch (Exception e) {
                                                        log.error("Exception occurred retrieving cached object", e);
                                                } finally {
                                                        isLoading = false;
                                                }
                                        }
                                }
                        }
                }
        }

0 голосов
/ 14 сентября 2009

Если вам нужно не время начальной загрузки, а перезагрузка, может быть, вы не против фактического времени перезагрузки, но хотите иметь возможность использовать старую версию при загрузке новой ?

Если вам это нужно, я предлагаю сделать ваш кеш экземпляром (в отличие от статического), доступным в поле.

  1. Вы запускаете перезагрузку каждую минуту с выделенным потоком (или, по крайней мере, не с обычными потоками), чтобы вы не задерживали свои обычные потоки.

  2. Перезагрузка создает новый экземпляр, загружает его с данными (занимает 1 секунду), а затем просто заменяет старый экземпляр новым. (Старый будет собирать мусор.) Замена объекта другим является атомарной операцией .

Анализ : Что происходит в том случае, если любой другой поток может получить доступ к старому кешу до последнего момента?
В худшем случае, инструкция сразу после получения старого экземпляра кэша, другой поток заменяет старый экземпляр новым. Но это не делает ваш код неисправным, так как запрос старого экземпляра кеша по-прежнему будет давать правильное значение, что является приемлемым для требования, которое я дал в первом предложении.

Чтобы сделать ваш код более корректным, вы можете создать свой экземпляр кэша как неизменный (нет доступных средств настройки, нет способа изменить внутреннее состояние). Это проясняет, что правильно использовать его в многопоточном контексте.

0 голосов
/ 14 сентября 2009

Я не уверен, что понимаю вашу потребность. Вам нужна более быстрая загрузка (и перезагрузка) кэша для части значений?

Если это так, я бы предложил разбить вашу структуру данных на более мелкие кусочки . Просто загрузите кусок, который вам нужен в данный момент. Если вы разделите размер на 10, вы поделите время загрузки на что-то, связанное с 10.

Это может относиться к исходному документу, который вы читаете, если это возможно. В противном случае это будет способ чтения, когда вы пропустите большую часть и загрузите только соответствующую часть.

Я считаю, что большинство данных можно разбить на части. Выберите более подходящий, вот примеры:

  • начальная буква: A *, B * ...
  • разделите ваш идентификатор на две части: первая часть - это категория, найдите ее в кэше, загрузите, если необходимо, затем найдите вторую часть внутри.
...