Java-параллелизм дает противоречивый результат. (с замком и LongAdder) - PullRequest
0 голосов
/ 28 июня 2018

Я делаю следующее упражнение:

  1. Напишите программу, которая обходит дерево каталогов и генерирует поток для каждого файла. В темах посчитайте количество слов в файлах и, не используя блокировки, обновить общий счетчик это объявлено как public static long count = 0; Запустите программу несколько раз. Что просходит? Почему?

  2. Исправьте программу предыдущего упражнения с помощью блокировка.

  3. Исправьте программу предыдущего упражнения с помощью LongAdder.

И я написал следующую программу, в которой

  1. CountWordThread ответы на упражнение 1,
  2. CountWordLockThread отвечает на упражнение 2 и
  3. CountWordLongAdderThread ответы на упражнение 3.

Следующий Java-код:

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {


    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // 1. this MAY get inconsistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // 2. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // 3. this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ExecutorService executor = Executors.newCachedThreadPool();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    executor.submit(t);
                });
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // 1. this MAY get inconsistent results
                tcw.runThreadsWithLock(s); // 2. this SHOULD NOT get inconsistent results
                // tcw.runThreadsWithLongAdder(s); // 3. this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Почти каждый раз, когда запускается 2 или 3, я получаю противоречивые ответы. И я не могу понять, почему.

Пример результата будет таким:

(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35862
(Lock) count: 35815 <-- note this
(Lock) count: 35862
(Lock) count: 35862

для упражнения 2 и

(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35826 <-- note this
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862
(LongAdder) count: 35862

для упражнения 3.

Вы можете мне помочь?

обновление

С помощью @chrylis я обновил свои ответы следующим кодом, который работает, как и ожидалось: (Причина, по которой приведенный выше код дает неправильный ответ, - именно то, что говорит @Ivan.

import java.io.*;
import java.util.*;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import java.util.regex.*;

public class ThreadedCountWord {

    public long count = 0;
    LongAdder la = new LongAdder();

    public class CountWordThread extends Thread {
        private File f;
        CountWordThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        count ++;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ReentrantLock lock = new ReentrantLock();

    public class CountWordLockThread extends Thread {
        private File f;
        CountWordLockThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        // It's important to wrap your code into a
                        // try/finally block to ensure unlocking in case
                        // of exceptions.
                        lock.lock();
                        try {
                            count++;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public class CountWordLongAdderThread extends Thread {
        private File f;
        CountWordLongAdderThread(File f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                BufferedReader br = new BufferedReader(new FileReader(f));
                String line;
                String pattern = "(\\w+)";
                Pattern r = Pattern.compile(pattern);
                while ((line = br.readLine()) != null) {
                    Matcher m = r.matcher(line);
                    while(m.find()) {
                        la.increment();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public void runThreads(Stream<Path> s) {
        // this MAY get inconsistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordThread t = new CountWordThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(NoLock) count: %d\n", count);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public void runThreadsWithLock(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLockThread t = new CountWordLockThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                });
            System.out.printf("(Lock) count: %d\n", count);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runThreadsWithLongAdder(Stream<Path> s) {
        // this SHOULD NOT generate in-consistent results
        try {
            count = 0;
            ArrayList<Thread> ts = new ArrayList<>();
            s.forEach(p -> {
                    CountWordLongAdderThread t = new CountWordLongAdderThread(p.toFile());
                    t.start();
                    ts.add(t);
                });
            ts.stream().forEach(t -> {
                    try {
                        t.join();   
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            System.out.printf("(LongAdder) count: %d\n", la.sum());
            la.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        // run multi times
        try {
            for (int i = 0; i < 20; i ++) {
                Path path = Paths.get(".");
                Stream<Path> sp = Files.walk(path);
                Stream<Path> s = sp.filter(p -> p.toString().endsWith(".java")
                                           && Files.isRegularFile(p)
                                           && Files.isReadable(p));
                ThreadedCountWord tcw = new ThreadedCountWord();
                // tcw.runThreads(s); // this MAY get inconsistent results
                // tcw.runThreadsWithLock(s); // this SHOULD NOT get inconsistent results
                tcw.runThreadsWithLongAdder(s); // this SHOULD NOT get inconsistent results
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1 Ответ

0 голосов
/ 28 июня 2018

Вы запускаете свои задания дважды: первый раз с t.start() и второй раз при отправке исполнителю. И поскольку вы не вызываете t.join() после t.start(), чтобы дождаться завершения задач, вы можете получить противоречивый результат только потому, что вы печатаете значение до того, как вся работа будет выполнена

...