JavaFX: запуск параллельных потоков и обновление общего прогресса до ProgressBar - PullRequest
0 голосов
/ 31 октября 2018

Мне нужно разделить некоторую рабочую нагрузку на потоки и запустить их параллельно , поскольку они независимы. Я также хочу отобразить общий прогресс в ProgressBar с JavaFx. Это означает, что индикатор выполнения показывает общую работу, проделанную каждым потоком до сих пор .

Ради простоты мы можем взять этот Counter класс в качестве примера

public class Counter implements Runnable {
    private int from, to;

    public Counter(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    public void run() {
        for (int i = from; i < to ; i++) {
            // Do some heavy operation
            // report about progress to the parent
        }

        // exit thread with status of success or failure
    }
}

Этот класс принимает от, до, в качестве граничных условий.

Чтобы не блокировать пользовательский интерфейс, я использую простой Task класс, подобный этому

public class MyTask extends Task<Integer> {
    int iter;

    public MyTask(int iter) {
        this.iter = iter;
    }

    @Override
    protected Integer call() throws Exception {
        // Simply divide work load to each thread
        int workers = 8;
        int limit = iter/workers;
        int rem = iter % workers;

        // Creates a executor
        ExecutorService executorService = Executors.newFixedThreadPool(workers);
        for (int i = 0; i < workers; i++) {
            int start = limit * i;
            int end = limit * (i + 1);
            if (i == workers - 1) end += rem;

            Counter counter = new Counter(start, end);
            executorService.submit(counter); // Submit work to be done
        }

        executorService.shutdown(); // start the execution

        // Get their progress, update overall progress to UI
        // Stop after all threads finished
    }
}

В MyTask я хочу обновить пользовательский интерфейс, как указано в комментариях, с полным завершением. (т.е. общее количество выполненных каждой цепочкой).

Есть ли способ сделать это? агрегировать прогресс параллельных задач и обновлять общий прогресс в пользовательском интерфейсе (я не рассчитываю на количество завершенных потоков, это текущий прогресс каждого потока, о котором я хочу сообщить в MyTask).

Ответы [ 2 ]

0 голосов
/ 03 ноября 2018

Я решил проблему с помощью класса PropertyChangeSupport. Это обеспечивает потокобезопасные свойства, а также обеспечивает прослушиватели свойств. Больше от здесь .

import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;

public final class ProgressReporter {
    private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
    private int progress = 0;

    public void addPropertyChangeListener(PropertyChangeListener listener) {
        propertyChangeSupport.addPropertyChangeListener(listener);
    }

    public void removePropertyChangeListener(PropertyChangeListener listener) {
        propertyChangeSupport.removePropertyChangeListener(listener);
    }

    public void accumulateProgress(int progress){
        this.propertyChangeSupport.firePropertyChange("progress", this.progress, this.progress + progress);
        this.progress += progress;
    }

    public int getProgress() {
        return progress;
    }
}

Теперь, слушая ProgressReporter, мы можем получить прогресс, когда поступят новые данные. Обратите внимание, что firePropertyChange срабатывает только тогда, когда значения old и new отличаются , в противном случае обновление не будет выполняться для слушателей.

Теперь мы создаем класс Counter, чтобы использовать этот ProgressReporter

public class Counter implements Runnable {
    private int id, from, to, sleep;
    private ProgressReporter reporter;

    public Counter(int id, int from, int to, int sleep, ProgressReporter reporter) {
        this.from = from;
        this.to = to;
        this.sleep = sleep;
        this.id = id;
        this.reporter = reporter;

        System.out.println("Thread #" + id + " started delay=" + sleep);
    }

    @Override
    public void run() {
        for (int i = from; i < to ; i++) {
            try {
                Thread.sleep(sleep);
                reporter.accumulateProgress(1); // this will fire an update to listeners
            } catch (InterruptedException e){

            }
        }

        System.out.println("Thread #" + id + " is completed");
    }
}

Теперь в Задании, инициированном потоком JavaFX, реализованном так:

public class MyTask extends Task<Integer> {
    int iterations;
    Random random = new Random();
    ProgressReporter reporter = new ProgressReporter();

    public MyTask(int iterations) {
        this.iterations = iterations;
    }

    @Override
    protected Integer call() throws Exception {
        // Simply divide work load to each thread
        int workers = 8;
        int limit = iterations /workers;
        int rem = iterations % workers;

        // add a property listener for progress update
        reporter.addPropertyChangeListener(new PropertyChangeListener() {
            @Override
            public void propertyChange(PropertyChangeEvent evt) {
                updateProgress((int) evt.getNewValue(), iterations);
            }
        });

        // Creates a executor
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < workers; i++) {
            int start = limit * i;
            int end = limit * (i + 1);
            if (i == workers - 1) end += rem;

            Counter counter = new Counter(i ,start, end, random.nextInt(1000), reporter);
            executorService.submit(counter); // Submit work to be done
        }

        executorService.shutdown(); // shutdown executor not to accept any more threads
        while (!executorService.isTerminated()){
            if (isCancelled()){
                executorService.shutdownNow(); // stop all the processes immediately
            }
        }

        return reporter.getProgress();
    }
}

Теперь обычные привязки JavaFX, такие как

progressBar.progressProperty().bind(task.progressProperty())

Полный исходный код можно найти в здесь .

0 голосов
/ 31 октября 2018

В зависимости от количества параллельных задач, вы можете просто использовать Task s для логики Counter и обновлять общий прогресс от слушателей до progress свойств этих задач.

Если слишком много из этих задач выполняется параллельно, это может замедлить поток приложения JavaFX, так как слишком много Runnable s может ожидать выполнения одновременно.

Вы можете реализовать обновления на основе разницы в прогрессе самостоятельно, используя оператор synchronized. Следующий код запускает обновления из потока приложения javafx для простоты, но эта логика может быть перемещена в другой поток без каких-либо проблем:

@Override
public void start(Stage primaryStage) {
    ProgressBar progressBar = new ProgressBar();

    int workers = 8;

    ExecutorService executorService = Executors.newFixedThreadPool(workers);

    final int taskCount = 12;
    final int elementsPerTask = 50;
    final int elementCount = elementsPerTask * taskCount;

    ProgressReceiver progressReceiver = new ProgressReceiver() {

        private boolean updating = false;
        private int progress = 0;

        @Override
        public void acceptProgress(int oldValue, int newValue) {
            synchronized(this) {
                progress += newValue - oldValue;
                if (!updating) {
                    updating = true;
                    Platform.runLater(() -> {
                        synchronized (this) {
                            updating = false;
                            progressBar.setProgress(((double) progress) / elementCount);
                        }
                    });
                }
            }
        }

    };
    for (int i = 0; i < taskCount; i++) {
        int start = elementsPerTask * i;
        int end = elementsPerTask * (i + 1);

        Counter counter = new Counter(start, end, progressReceiver);
        executorService.submit(counter);
    }

    executorService.shutdown();

    StackPane root = new StackPane(progressBar);

    Scene scene = new Scene(root, 300, 300);

    primaryStage.setScene(scene);
    primaryStage.show();
}
public interface ProgressReceiver {
    void acceptProgress(int oldValue, int newValue);
}
public class Counter implements Runnable {

    private final ProgressReceiver progressReceiver;

    private final int from, to;

    public Counter(int from, int to, ProgressReceiver progressReceiver) {
        this.from = from;
        this.to = to;
        this.progressReceiver = progressReceiver;
    }

    @Override
    public void run() {
        for (int i = from; i < to; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            int oldProgress = i - from;
            progressReceiver.acceptProgress(oldProgress, oldProgress + 1);
        }

        // exit thread with status of success or failure
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...