Неправильный вывод из-за многопоточности в WebFlux - PullRequest
0 голосов
/ 20 июня 2020

Когда я пытаюсь выполнить Код, результат не такой, как я его жду.

Я пробую первый тест диапазон чисел от 1 до max = 200000 параллельно, используя WebFlux (Flux) в файле, чтобы записать их.

И второй тест , я пытаюсь проверить первый тест, но я обнаружил, что не все числа были переданы (покрыты), и когда Первый тест завершился sh, второй тест должен подождать, я не знаю почему?

Результат после ожидания: Сканирование в числах: [19.601], максимальное значение: [19.978 ]

Но не дожидаясь, Сканирование в числах: [0], а максимальное значение: [0]

Может кто-нибудь, пожалуйста, объясните, почему диапазон числа, которое не было охвачено, и почему я должен ждать в следующем тесте, и почему оператор после распараллеливания был выполнен до того, как диапазон был завершен sh?


package loadtest;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class LoadTest {

    private static final String FILE_NAME = System.getProperty("user.home") + "/Desktop/test-parallel.txt";

    private final int max = 20000;


    @Test
    public void test01_WriteParallel() throws Exception {

        Files.deleteIfExists(Paths.get(FILE_NAME));

        FileWriter fileWriter = new FileWriter(FILE_NAME);

        PrintWriter printWriter = new PrintWriter(fileWriter);

        Runtime runtime = Runtime.getRuntime();

        // get the number of processors available to the Java virtual machine
        int numberOfProcessors = runtime.availableProcessors();

        System.out.println("Number of processors available to this JVM: " + numberOfProcessors);

        long start = System.currentTimeMillis();

        Flux.range(1, max)
            .parallel(numberOfProcessors)
            .runOn(Schedulers.parallel())
            .subscribe(i -> printWriter.printf(String.format("%s%n", i)));


        System.out.println(String.format("It was ended in: %sms", (System.currentTimeMillis() - start)));


    }

    @Test
    public void test02_GetMaxOfRange() throws InterruptedException{

       Thread.sleep(15000);
        final List<Integer> integers = new ArrayList<>();

        try {
            try (final BufferedReader br = new BufferedReader(new FileReader(FILE_NAME))) {
                String line;
                while ((line = br.readLine()) != null) {

                    integers.add(Integer.valueOf(line));
                }

            }
        } catch (final IOException e) {
            e.printStackTrace();
        }

        int max = integers.parallelStream()
                          .mapToInt(t -> t)
                          .max().orElse(0);

        System.out.println(String.format("Scanning in numbers: [%,d], and the max is: [%,d]", integers.size(), max));

    }


}

...