путаница в параллельном потоке Java 8 / проблема - PullRequest
0 голосов
/ 16 октября 2018

Я новичок в параллельном потоке и пытаюсь создать 1 пример программы, которая рассчитает значение * 100 (от 1 до 100) и сохранит его на карте.Выполняя код, я получаю различный счет на каждой итерации.Возможно, я где-то ошибаюсь, поэтому, пожалуйста, подскажите, кто знает, как это сделать.

code :

import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;

public class Main{    
    static int l = 0;       
    public static void main (String[] args) throws java.lang.Exception {
        letsGoParallel();
    }       
    public static int makeSomeMagic(int data) {
        l++;
        return data * 100;
    }        
    public static void letsGoParallel() {
        List<Integer> dataList = new ArrayList<>();
        for(int i = 1; i <= 100 ; i++) {
            dataList.add(i);
        }
        Map<Integer, Integer> resultMap = new HashMap<>();
        dataList.parallelStream().map(f -> {
            Integer xx = 0;
            {
                xx = makeSomeMagic(f);
            }
            resultMap.put(f, xx);
            return 0;
        }).collect(Collectors.toList());
        System.out.println("Input Size: " + dataList.size());
        System.out.println("Size: " + resultMap.size());
        System.out.println("Function Called: " + l);
    }
}

Runnable Code

Последний вывод

Размер ввода: 100

Размер: 100

Вызванная функция: 98

При каждом запуске прогон отличается.Я хочу использовать параллельный поток в моем собственном приложении, но из-за этой путаницы / проблемы я не могу.В моем приложении 100-200 уникальных номеров, над которыми нужно выполнить одну и ту же операцию.Короче говоря, есть функция, которая обрабатывает что-то.

Ответы [ 4 ]

0 голосов
/ 16 октября 2018
  • Нет необходимости считать, сколько раз был вызван метод.
  • Stream поможет вам выполнить цикл в байтовом коде.
  • Передайте свою логику (функцию) на Stream, не используйте никакие поточно-ориентированные переменные в многопоточности (включая parallelStream)

, как это.

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ParallelStreamClient {
//  static int l = 0;---> no need to count times.

    public static void main(String[] args) throws java.lang.Exception {
        letsGoParallel();
    }

    public static int makeSomeMagic(int data) {
//  l++;-----> this is no thread-safe way
    return data * 100;
}

public static void letsGoParallel() {
    List<Integer> dataList = new ArrayList<>();
    for (int i = 1; i <= 100; i++) {
        dataList.add(i);
    }
    Map<Integer, Integer> resultMap =         
    dataList.parallelStream().collect(Collectors.toMap(i -> i,ParallelStreamClient::makeSomeMagic));
    System.out.println("Input Size: " + dataList.size());
    System.out.println("Size: " + resultMap.size());
    //System.out.println("Function Called: " + l);       
}
0 голосов
/ 16 октября 2018

При вводе некоторых значений в resultMap вы используете побочный эффект :

 dataList.parallelStream().map(f -> {
            Integer xx = 0;
            {
                xx = makeSomeMagic(f);
            }
            resultMap.put(f, xx);
            return 0;
        })

API заявляет:

Операции без сохранения состояния, такие как фильтр и отображение, не сохраняют состояния от ранее увиденного элемента при обработке нового элемента - каждый элемент может обрабатываться независимо от операций над другими элементами.

Продолжение с :

Результаты конвейерного потока могут быть недетерминированными или неверными, если поведенческие параметры для операций потока являются состоящими.Лямбда с состоянием (или другой объект, реализующий соответствующий функциональный интерфейс) - это тот, чей результат зависит от любого состояния, которое может измениться во время выполнения потокового конвейера.

Это следует примеру, подобному вашему, показывающему:

... если операция отображения выполняется параллельно, результаты для одного и того же ввода могут отличаться от запуска к выполнению из-за различий в расписании потоков, тогда как при использовании лямбда-выражения без сохранения состояния результаты будутвсегда будет одинаковым.

Это объясняет ваше наблюдение: При каждом запуске прогона выводится разный.

Правильный подход показан @ Эран

0 голосов
/ 16 октября 2018

Надеюсь, все работает отлично.сделав Synchronied функцию makeSomeMagic и используя структуру данных Threadsafe ConcurrentHashMap и напишите простое утверждение

dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));

Весь код здесь:

import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;

public class Main{  
static int l = 0;
  public static void main (String[] args) throws java.lang.Exception {
    letsGoParallel();
  }
  public synchronized static int makeSomeMagic( int data) { // make it synchonized
    l++;
    return data * 100;
  }
  public static void letsGoParallel() {
    List<Integer> dataList = new ArrayList<>();
    for(int i = 1; i <= 100 ; i++) {
      dataList.add(i);
    }
    Map<Integer, Integer> resultMap = new ConcurrentHashMap<>();// use ConcurrentHashMap
    dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));
    System.out.println("Input Size: " + dataList.size());
    System.out.println("Size: " + resultMap.size());
    System.out.println("Function Called: " + l);
  }
}
0 голосов
/ 16 октября 2018

Ваш доступ как к HashMap, так и к переменной l является не поточно-ориентированным, поэтому выход при каждом запуске отличается.

Правильный путьчтобы сделать то, что вы пытаетесь сделать, это собрать элементы Stream в Map:

Map<Integer, Integer> resultMap =
    dataList.parallelStream()
            .collect(Collectors.toMap (Function.identity (), Main::makeSomeMagic));

РЕДАКТИРОВАТЬ: переменная l все еще обновляется в потоке , а не безопасный способ с этим кодом, так что вам придется добавить свою собственную безопасность потока, если для вас важно конечное значение переменной.

...