Фильтрация параллельных коллекций Scala с ранним прерыванием при достижении желаемого количества результатов - PullRequest
10 голосов
/ 10 ноября 2011

Учитывая очень большой экземпляр collection.parallel.mutable.ParHashMap (или любой другой параллельный сбор), как можно прервать фильтрующее параллельное сканирование, если заданное, скажем, 50 совпадений было найденный ?

Попытка накапливать промежуточные совпадения в поточно-ориентированной «внешней» структуре данных или сохранение внешнего AtomicInteger с счетчиком результатов кажется в 2–3 раза медленнее на 4 ядрах, чем при использовании обычного collection.mutable.HashMap и привязка одного ядра к 100%.

Мне известно, что find или существует в коллекциях Par * отменяется "изнутри". Есть ли способ обобщить это, чтобы найти более одного результата?

Вот код, который все еще кажется в 2-3 раза медленнее в ParHashMap с ~ 79 000 записей, а также имеет проблему с заполнением больше , чем maxResults , приводит к результатам CHM (Вероятно, это связано с тем, что поток прерывается после incrementAndGet , но до break , что позволяет другим потокам добавлять больше элементов). Обновление: кажется, замедление происходит из-за рабочих потоков, конкурирующих с counter.incrementAndGet (), что, конечно, не соответствует цели всего параллельного сканирования: - (

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}

Ответы [ 3 ]

2 голосов
/ 24 ноября 2011

Сначала я бы сделал параллельное сканирование, в котором переменная maxResults была бы локально потоковой.Это приведет к результатам (maxResults * numberOfThreads).

Затем я выполню однопоточное сканирование, чтобы уменьшить его до maxResults.

1 голос
/ 18 января 2013

Я провел интересное расследование по вашему делу.

Расследование рассуждений

Я подозревал, что проблема связана с изменчивостью входных данных Map, и я постараюсь объяснить вам, почему: Реализация HashMap организует данные в разные сегменты, как можно видеть в Википедии.

Wikipedia HashMap

Первые поточно-ориентированные коллекции в Java, синхронизированные коллекции были основаны на синхронизации всех методов базовой реализации и привели к снижению производительности. Дальнейшие исследования и размышления привели к созданию более производительной коллекции Concurrent Collection, такой как ConcurrentHashMap, подход которой был более умным: почему бы нам не защитить каждый контейнер определенной блокировкой?

По моим ощущениям, проблема с производительностью возникает из-за:

  • при параллельной работе вашего фильтра некоторые потоки будут конфликтовать при одновременном доступе к одному и тому же блоку и попадать в ту же блокировку, потому что ваша карта изменяемая .
  • У вас есть счетчик, чтобы увидеть, сколько результатов у вас есть, в то время как вы можете реально проверить размер результат. Если у вас есть потокобезопасный способ создания коллекции, вам также не нужен многопоточный счетчик.

Результат расследования

Я разработал тестовый пример и обнаружил Я ошибся . Проблема заключается в параллельном характере выходной карты. Фактически, именно здесь происходит столкновение, когда вы размещаете элементы на карте, а не когда вы итерируете ее. Кроме того, поскольку вам нужен только результат для значений, вам не нужны ключи, хеширование и все функции карты. Может быть интересно проверить, удаляете ли вы AtomicCounter и используете ли вы только карту result, чтобы проверить, собрали ли вы достаточно элементов, как работает ваша версия.

Пожалуйста, будьте осторожны со следующим кодом в Scala 2.9.2. В другом посте я объясняю, почему мне нужны две разные функции для параллельной и непараллельной версии: Вызов карты в параллельной коллекции через ссылку на тип предка

object MapPerformance {

  val size = 100000
  val items = Seq.tabulate(size)( x => (x,x*2))


  val concurrentParallelMap = ImmutableParHashMap(items:_*)

  val concurrentMutableParallelMap = MutableParHashMap(items:_*)

  val unparallelMap = Map(items:_*)


  class ThreadSafeIndexedSeqBuilder[T](maxSize:Int) {
    val underlyingBuilder = new VectorBuilder[T]()
    var counter = 0
    def sizeHint(hint:Int) { underlyingBuilder.sizeHint(hint) }
    def +=(item:T):Boolean ={
      synchronized{
        if(counter>=maxSize)
          false
        else{
          underlyingBuilder+=item
          counter+=1
          true
        }
      }
    }
    def result():Vector[T] = underlyingBuilder.result()

  }

  def find(map:ParMap[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = new ThreadSafeIndexedSeqBuilder[Int](maxResults)
    resultsBuilder.sizeHint(maxResults)

    import util.control.Breaks._
    breakable
    {
      for ((key, node) <- map if filter(node))
      {
        val newItemAdded = resultsBuilder+=node
        if (!newItemAdded)
          break()

      }
    }
    resultsBuilder.result().seq

  }

  def findUnParallel(map:Map[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = Array.newBuilder[Int]
    resultsBuilder.sizeHint(maxResults)

    var counter = 0
      for {
        (key, node) <- map if filter(node)
        if counter < maxResults
      }{
        resultsBuilder+=node
        counter+=1
      }

    resultsBuilder.result()

  }

  def measureTime[K](f: => K):(Long,K) = {
    val startMutable = System.currentTimeMillis()
    val result = f
    val endMutable = System.currentTimeMillis()
    (endMutable-startMutable,result)
  }

  def main(args:Array[String]) = {
    val maxResultSetting=10
    (1 to 10).foreach{
      tryNumber =>
        println("Try number " +tryNumber)
        val (mutableTime, mutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (immutableTime, immutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (unparallelTime, unparallelResult) = measureTime(findUnParallel(unparallelMap,_%2==0,maxResultSetting))
        assert(mutableResult.size==maxResultSetting)
        assert(immutableResult.size==maxResultSetting)
        assert(unparallelResult.size==maxResultSetting)
        println(" The mutable version has taken " + mutableTime + " milliseconds")
        println(" The immutable version has taken " + immutableTime + " milliseconds")
        println(" The unparallel version has taken " + unparallelTime + " milliseconds")
     }
  }

}

С этим кодом у меня есть систематическая параллель (как изменяемая, так и неизменная версия входной карты) примерно в 3,5 раза быстрее, чем непараллельная на моей машине.

0 голосов
/ 10 ноября 2011

Вы можете попытаться получить итератор, а затем создать ленивый список (поток), в котором вы фильтруете (с вашим предикатом) и берете желаемое количество элементов. Поскольку это не строгое, это «взятие» элементов не оценивается. После этого вы можете принудительно выполнить выполнение, добавив «.par» ко всему этому и достигнув распараллеливания.

Пример кода:

Распараллеленная карта со случайными значениями (имитирующая вашу параллельную хэш-карту):

scala> myMap
res14: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(66978401 -> -1331298976, 256964068 -> 126442706, 1698061835 -> 1622679396, -1556333580 -> -1737927220, 791194343 -> -591951714, -1907806173 -> 365922424, 1970481797 -> 162004380, -475841243 -> -445098544, -33856724 -> -1418863050, 1851826878 -> 64176692, 1797820893 -> 405915272, -1838192182 -> 1152824098, 1028423518 -> -2124589278, -670924872 -> 1056679706, 1530917115 -> 1265988738, -808655189 -> -1742792788, 873935965 -> 733748120, -1026980400 -> -163182914, 576661388 -> 900607992, -1950678599 -> -731236098)

Получите итератор и создайте поток из итератора и отфильтруйте его. В этом случае мой предикат принимает только пары (значения элемента карты). Я хочу получить 10 четных элементов, поэтому я беру 10 элементов, которые будут оцениваться, только когда я заставлю его:

scala> val mapIterator = myMap.toIterator
mapIterator: Iterator[(Int, Int)] = HashTrieIterator(20)


scala> val r = Stream.continually(mapIterator.next()).filter(_._2 % 2 == 0).take(10)
r: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), ?)

Наконец, я вынуждаю оценку, которая получает только 10 элементов, как запланировано

scala> r.force
res16: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), (256964068,126442706), (1698061835,1622679396), (-1556333580,-1737927220), (791194343,-591951714), (-1907806173,365922424), (1970481797,162004380), (-475841243,-445098544), (-33856724,-1418863050), (1851826878,64176692))

Таким образом, вы получаете только желаемое количество элементов (без необходимости обработки оставшихся элементов) и распараллеливаете процесс без блокировок, атомизации или разрывов.

Пожалуйста, сравните это с вашими решениями, чтобы увидеть, насколько это хорошо.

...