Эффективный алгоритм распределения работы? - PullRequest
1 голос
/ 28 марта 2010

Это немного сложно объяснить, но здесь мы идем. По сути, проблема заключается в том, «как эффективно разбить проблемы на подзадачи». «Эффективный» здесь означает, что разбитые подзадачи как можно больше. По сути, было бы идеально, если бы мне вообще не приходилось разбирать проблемы. Тем не менее, поскольку работник может работать только над определенными блоками проблем, мне нужно расстаться. Но я хочу найти способ сделать это как можно более грубым.

Вот какой-то псевдокод ..

У нас есть такие проблемы (извините, это на Java. Если вы не понимаете, я был бы рад объяснить).

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

И подзадача:

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

Тогда работа будет выглядеть так.

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

Теперь у нас есть экземпляры «Работника», которые имеют свое собственное состояние sections I have.

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

уф.

Итак, у нас много Problem с, и Workers постоянно просят больше SubProblems. Моя задача разбить Problems на SubProblem и отдать им. Сложность, однако, заключается в том, что позже мне нужно собрать все результаты для подзадач и объединить (уменьшить) их в Result для всего Problem.

Это, однако, дорого, поэтому я хочу дать рабочим "куски" как можно большего размера (имеет как можно больше targetedSections).

Это не обязательно должно быть идеально (математически настолько эффективно, насколько это возможно или что-то в этом роде). Я имею в виду, я думаю, что невозможно найти идеальное решение, потому что вы не можете предсказать, сколько времени займет каждое вычисление и т. Д. Но есть ли хорошее эвристическое решение для этого? Или, может быть, некоторые ресурсы, которые я могу прочитать, прежде чем приступить к проектированию?

Любой совет высоко ценится!

EDIT: У нас также есть контроль над Распределением Разделов, так что это еще один вариант. По сути, единственным ограничением является то, что рабочий может иметь только фиксированное количество секций.

1 Ответ

1 голос
/ 28 марта 2010

Хорошо, похоже, у вас есть модель шардинга для ваших сетевых сервисов, мы делаем нечто подобное и используем обратный индекс «entityId» (sectionId) для «клиента» (работника), который подключится к конкретной сети сервис, который будет иметь дело с этим конкретным лицом. Простейший метод (см. Ниже) - использовать обратную карту id для работника. Если память является ограничением, другой возможностью является использование функции (например, sectionId% количество сервисов).

Чтобы сервисы работали как можно больше, существует простой алгоритм пакетной обработки, который будет заполнять пакет до определенного пользователем максимального значения. Это позволит приблизительно распределить объемы работы в соответствии с тем, насколько быстро удаленные службы могут их использовать.

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}
...