Исправление и динамическая настройка количества рабочих потоков. - PullRequest
0 голосов
/ 21 апреля 2019

В настоящее время я реализую контейнер данных / структуру данных, которая имеет метод addRecord (конечная запись Redcord) и метод close ().

public class Container{

    Container() {
    }

    public void addRecord(final Record record){
        // do something with the record
        // add the modified it to the container
    }

    public void close(){

    }     
}

Поскольку для сохранения записи в контейнере необходимо выполнить несколько шагов, я хочу передать эти шаги в потоки, прежде чем окончательно добавить их в контейнер.

Моя проблема в том, что я не хочу, чтобы какой-либо экземпляр контейнера использовал больше, чем, скажем, 4 потока, и что если у меня одновременно открыто несколько экземпляров контейнера, то не более, например, 12 потоков следует использовать. Кроме того, я хотел бы, чтобы после создания 4-го контейнера каждый из трех открытых контейнеров терял один из своих потоков, так что все 4 контейнера могут использовать 3 потока каждый.

Я искал ThreadPools и ExecutorServices. Хотя настройка пула потоков размером 12 проста, трудно ограничить число используемых потоков до 4 при использовании ExecutorServices. Напомним, я хочу не более 12 потоков, поэтому у меня есть статический экземпляр размера 12, который я разделяю между экземплярами контейнера.

Это то, с чего я начал

public class Container{

    public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

    final List<Future<Entry>> _fRecords = new LinkedList<>();

    Container() {
    }

    public void addRecord(final Record record){
        _fRecords.add(SERVICE.submit(new Callable<Entry>{

            @Override
            public Entry call() throws Exception {
                return record.createEntry();
            }
        }));
    }

    public void close() throws Exception {
        for(final Future<Entry> f) {
            f.get(); // and add the entry to the container           
        }
    }     
}

Ясно, что это не гарантирует, что максимум 4 потока используются одним экземпляром. Более того, я не понимаю, как эти службы Executor могут быть принудительно закрыты или переназначены, если для другого контейнера требуется пара потоков.

Вопросы:

  1. Поскольку сам Redord является интерфейсом, а время выполнения createEntry () зависит от фактической реализации, я хочу убедиться, что разные контейнеры не используют одни и те же потоки / работники, т.е. я не хочу что любой поток выполняет работу для двух разных контейнеров. Идея заключается в том, что я не хочу, чтобы экземпляр контейнера, который обрабатывает только «долго выполняющиеся» записи, замедлялся контейнерами, которые имеют дело только с «коротко работающими записями». Если это концептуально не имеет смысла (вопрос к вам), не было бы необходимости иметь изолированные потоки / рабочие элементы для разных контейнеров.

  2. Является ли реализация моего собственного ExecutorService правильным способом? Может кто-нибудь указать мне на проект, который имеет аналогичную проблему / решение? В противном случае, я полагаю, мне придется реализовать свой собственный ExecutorService и поделиться им среди моих Контейнеров, или есть лучшее решение?

  3. В настоящее время я собираю все фьючерсы в методе close, поскольку контейнер должен хранить записи / записи в порядке их поступления. Очевидно, что это требует много времени, и поэтому я хотел бы сделать это, как только будущее закончится. Имеет ли смысл, чтобы дополнительный работник обрабатывал только фьючерсы, или лучше, чтобы мой работник выполнял эту работу, т. Е. Имел взаимодействие с потоками.

1 Ответ

0 голосов
/ 21 апреля 2019

Я не думаю, что среда выполнения Java предоставляет что-то, что отвечает вашим потребностям, из коробки.

Я написал свой собственный класс для удовлетворения таких потребностей (общий пул + ограничение для данной очереди).

public static class ThreadLimitedQueue {
    private final ExecutorService executorService;
    private final int limit;

    private final Object lock = new Object();
    private final LinkedList<Runnable> pending = new LinkedList<>();
    private int inProgress = 0;

    public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
        this.executorService = executorService;
        this.limit = limit;
    }

    public void submit(Runnable runnable) {
        final Runnable wrapped = () -> {
            try {
                runnable.run();
            } finally {
                onComplete();
            }
        };
        synchronized (lock) {
            if (inProgress < limit) {
                inProgress++;
                executorService.submit(wrapped);
            } else {
                pending.add(wrapped);
            }
        }
    }

    private void onComplete() {
        synchronized (lock) {
            final Runnable pending = this.pending.poll();
            if (pending == null || inProgress > limit) {
                inProgress--;
            } else {
                executorService.submit(pending);
            }
        }
    }
}

Единственное отличие в моем случае limit - это постоянное значение, но вы можете изменить его.Например, вы можете заменить int limit на Supplier<Integer> limitFunction, и эта функция должна обеспечивать динамическое ограничение, например,

Supplier<Integer> limitFunction = 12 / containersList.size();

Просто сделайте его более надежным (например, что делать, если containerList пуст или превышает 12)

...