Пример агрегатной функции Scala - PullRequest
34 голосов
/ 03 августа 2011

Я искал и не могу найти пример или обсуждение функции aggregate в Scala, которую я могу понять.Это кажется довольно мощным.

Можно ли использовать эту функцию, чтобы уменьшить значения кортежей, чтобы сделать коллекцию типа нескольких карт?Например:

val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))

После применения агрегата:

Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))

Также, можете ли вы привести пример параметров z, segop и combop?Мне неясно, что делают эти параметры.

Ответы [ 7 ]

92 голосов
/ 04 августа 2011

Посмотрим, не поможет ли какое-нибудь искусство ascii.Рассмотрим сигнатуру типа aggregate:

def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B

Также обратите внимание, что A относится к типу коллекции.Итак, скажем, у нас есть 4 элемента в этой коллекции, тогда aggregate может работать так:

z   A   z   A   z   A   z   A
 \ /     \ /seqop\ /     \ /    
  B       B       B       B
    \   /  combop   \   /
      B _           _ B
         \ combop  /
              B

Давайте посмотрим на практический пример этого.Скажем, у меня есть GenSeq("This", "is", "an", "example"), и я хочу знать, сколько в нем символов.Я могу написать следующее:

Обратите внимание на использование par в приведенном ниже фрагменте кода.Вторая функция, переданная агрегату, - это то, что вызывается после вычисления отдельных последовательностей.Scala может сделать это только для множеств, которые можно распараллелить.

import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)

Итак, сначала он вычислит это:

0 + "This".length     // 4
0 + "is".length       // 2
0 + "an".length       // 2
0 + "example".length  // 7

То, что он делает дальше, не может быть предсказано (естьболее чем один способ объединения результатов), но он может сделать это (как в вышеприведенном искусстве ascii):

4 + 2 // 6
2 + 7 // 9

В этот момент он завершается с

6 + 9 // 15

, что даетконечный результат.Теперь, это немного похоже по структуре на foldLeft, но у него есть дополнительная функция (B, B) => B, которой нет.Эта функция, однако, позволяет ей работать параллельно!

Учтите, например, что каждое из четырех вычислений начальные вычисления не зависят друг от друга и могут выполняться параллельно.Следующие два (в результате 6 и 9) могут быть запущены после завершения вычислений, от которых они зависят, но эти два могут также выполняться параллельно.

7 вычислений, распараллеленных каквыше, может потребоваться всего лишь 3 последовательных вычисления.

На самом деле, при таком небольшом наборе затраты на синхронизацию вычислений будут достаточно большими, чтобы уничтожить любые выгоды.Кроме того, если вы сложите это, потребуется всего 4 вычислений.Однако, как только ваши коллекции становятся больше, вы начинаете видеть некоторые реальные выгоды.

Рассмотрим, с другой стороны, foldLeft.Поскольку он не имеет дополнительной функции, он не может распараллелить какие-либо вычисления:

(((0 + "This".length) + "is".length) + "an".length) + "example".length

Каждая внутренняя скобка должна быть вычислена до того, как внешняя скобка может продолжаться.

60 голосов
/ 03 августа 2011

Агрегатная функция этого не делает (за исключением того, что это очень общая функция, и ее можно использовать для этого).Вы хотите groupBy.Ближе к минимуму.Когда вы начинаете с Seq[(String, String)], и вы группируете, беря первый элемент в кортеже (который равен (String, String) => String), он возвращает Map[String, Seq[(String, String)]).Затем вам нужно отбросить первый параметр в значениях Seq [String, String)].

Итак

list.groupBy(_._1).mapValues(_.map(_._2))

Там вы получите Map[String, Seq[(String, String)].Если вы хотите Seq вместо Map, позвоните toSeq для результата.Я не думаю, что у вас есть гарантия заказа в полученном Seq, хотя


Aggregate - более сложная функция.

Сначала рассмотрим Reduce Left и ReduRight.Пусть as будет непустой последовательностью as = Seq(a1, ... an) элементов типа A, а f: (A,A) => A будет некоторым способом объединить два элемента типа A в один.Я отмечу это как бинарный оператор @, a1 @ a2, а не f(a1, a2).as.reduceLeft(@) будет вычислять (((a1 @ a2) @ a3)... @ an).reduceRight поставит скобки в другую сторону, (a1 @ (a2 @... @ an)))).Если @ оказывается ассоциативным, то не заботятся о скобках.Можно вычислить это как (a1 @... @ ap) @ (ap+1 @...@an) (внутри 2 больших парантезов тоже будут парантезы, но об этом не будем беспокоиться).Тогда можно было бы выполнить две части параллельно, в то время как вложенный брекетинг в reduLeft или reduRight заставляет полностью выполнять последовательные вычисления.Но параллельные вычисления возможны только тогда, когда известно, что @ является ассоциативным, а метод reduLeft не может этого знать.

Тем не менее, может существовать метод reduce, чей вызывающий будет отвечать за обеспечение того, чтобы операция была ассоциативной.Затем reduce упорядочит вызовы так, как считает нужным, возможно, делая их параллельно.Действительно, такой метод есть.

Однако существует ограничение на различные методы сокращения.Элементы Seq могут быть объединены только в результат того же типа: @ должно быть (A,A) => A.Но может возникнуть более общая проблема их объединения в B.Каждый начинается со значения b типа B и объединяет его с каждым элементом последовательности.Оператор @ равен (B,A) => B, а один вычисляет (((b @ a1) @ a2) ... @ an).foldLeft делает это.foldRight делает то же самое, но начинается с an.Там операция @ не имеет шансов быть ассоциативной.Когда кто-то пишет b @ a1 @ a2, это должно означать (b @ a1) @ a2, так как (a1 @ a2) будет напечатано неправильно.Так что foldLeft и foldRight должны быть последовательными.

Предположим, однако, что каждый A можно превратить в B, запишем его с !, a! типа B.Кроме того, предположим, что существует операция + (B,B) => B и что @ таков, что b @ a на самом деле b + a!.Вместо того, чтобы комбинировать элементы с @, можно сначала преобразовать все из них в B с помощью !, а затем объединить их с +.Это было бы as.map(!).reduceLeft(+).И если + является ассоциативным, то это можно сделать с помощью Reduce, а не быть последовательным: as.map (!). Reduce (+).Может быть гипотетический метод as.associativeFold (b,!, +).

Совокупность очень близка к этому.Однако может быть, что существует более эффективный способ реализации b@a, чем b+a! Например, если тип B равен List[A], а b @ a является a :: b, тогда a! будетa::Nil, а b1 + b2 будет b2 ::: b1.a :: b намного лучше, чем (a :: Nil) ::: b.Чтобы извлечь выгоду из ассоциативности, но все же использовать @, сначала нужно разделить b + a1! + ... + an! на (b + a1! + ap!) + (ap+1! + ..+ an!), а затем вернуться к использованию @ с (b @ a1 @ an) + (ap+1! @ @ an).Еще нужно!на ap + 1, потому что нужно начинать с некоторого b.И + все еще необходим, появляясь между парантезами.Для этого as.associativeFold(!, +) можно изменить на as.optimizedAssociativeFold(b, !, @, +).

Вернуться к +. + является ассоциативным или, что эквивалентно, (B, +) является полугруппой. На практике большинство полугрупп, используемых в программировании, тоже являются моноидами, т.е. они содержат нейтральный элемент z (для ноль ) в B, так что для каждого b, z + b = b + z = b. В этом случае операция !, которая имеет смысл, скорее всего, будет a! = z @ a. Более того, поскольку z является нейтральным элементом b @ a1 ..@ an = (b + z) @ a1 @ an, который равен b + (z + a1 @ an). Так что всегда можно начать агрегацию с z. Если вместо этого требуется b, вы делаете b + result в конце. Со всеми этими гипотезами мы можем сделать s.aggregate(z, @, +). Это то, что делает aggregate. @ - это аргумент seqop (применяется в последовательности z @ a1 @ a2 @ ap), а + равен combop (применяется к уже частично объединенным результатам, как в (z + a1@...@ap) + (z + ap+1@...@an)).

Подводя итог, as.aggregate(z)(seqop, combop) вычисляет то же самое, что и as.foldLeft(z)( seqop), при условии, что

  • (B, combop, z) является моноидом
  • seqop(b,a) = combop(b, seqop(z,a))

Агрегированная реализация может использовать ассоциативность combop для группировки вычислений по своему усмотрению (однако, не меняет местами элементы, + не должен быть коммутативным, ::: нет). Это может запустить их параллельно.

Наконец, решение начальной задачи с использованием aggregate оставлено читателю в качестве упражнения. Подсказка: используйте foldLeft, затем найдите z и combo, которые будут удовлетворять условиям, указанным выше.

10 голосов
/ 03 августа 2011

Подпись для коллекции с элементами типа A:

def aggregate [B] (z: B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B 
  • z - это объект типа B, действующий как нейтральный элемент.Если вы хотите что-то посчитать, вы можете использовать 0, если вы хотите построить список, начать с пустого списка и т. Д.
  • segop аналогична функции, которую вы передаете методам fold.Он принимает два аргумента: первый - того же типа, что и нейтральный элемент, который вы передали, и представляет материал, который уже был агрегирован на предыдущей итерации, второй - следующий элемент вашей коллекции.Результат также должен иметь тип B.
  • combop: это функция, объединяющая два результата в один.

В большинстве коллекций агрегат реализован в TraversableOnceas:

  def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B 
    = foldLeft(z)(seqop)

Таким образом combop игнорируется.Однако имеет смысл для параллельных коллекций , потому что seqop сначала будет применяться локально параллельно, а затем вызывается combop для завершения агрегации.

Так что для вашего примера,сначала вы можете попробовать сгибать:

val seqOp = 
  (map:Map[String,Set[String]],tuple: (String,String)) => 
    map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )


list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

Затем вам нужно найти способ свертывания двух мультикарт:

val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
       (map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) { 
         (result,k) => 
           result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) ) 
       } 

Теперь вы можете использовать агрегат параллельно:

list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

Применение метода «par» к списку, таким образом используя параллельный набор (scala.collection.parallel.immutable.ParSeq) списка, чтобы действительно использовать преимущества многоядерных процессоров.Без «par» не будет никакого увеличения производительности, так как агрегат не выполняется в параллельной коллекции.

9 голосов
/ 03 августа 2011

aggregate похоже на foldLeft, но может выполняться параллельно.

Как говорит отсутствующий фактор , линейная версия aggregate(z)(seqop, combop) эквивалентна foldleft(z)(seqop). Это, однако, нецелесообразно в параллельном случае, когда нам нужно было бы объединить не только следующий элемент с предыдущим результатом (как в обычном сгибе), но мы хотим разделить итерируемое на подтерибели, которые мы называем агрегатом, и нам нужно объединить их снова. (В порядке слева направо, но не ассоциативно, так как мы могли бы объединить последние части перед первыми частями итерируемого.) Это повторное объединение в общем нетривиально, и, следовательно, нужен метод (S, S) => S для выполнить это.

Определение в ParIterableLike:

def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
  executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}

, который действительно использует combop.

Для справки, Aggregate определяется как:

protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
  extends Accessor[S, Aggregate[S]] {
    @volatile var result: S = null.asInstanceOf[S]
    def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
    override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}

Важной частью является merge, где combop применяется с двумя промежуточными результатами.

3 голосов
/ 29 ноября 2011

Вот блог о том, как агрегировать производительность на многоядерном процессоре с тестом производительности.http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

Вот видео на тему "Параллельные коллекции Scala" из "Scala Days 2011".http://days2011.scala -lang.org / node / 138/272

Описание на видео

Параллельные коллекции Scala

Александар Прокопец

Абстракции параллельного программирования приобретают все большее значение по мере роста числа процессорных ядер.Модель программирования высокого уровня позволяет программисту сосредоточиться больше на программе и меньше на деталях низкого уровня, таких как синхронизация и балансировка нагрузки.Параллельные коллекции Scala расширяют модель программирования инфраструктуры сбора Scala, обеспечивая параллельные операции над наборами данных.В докладе будет описана архитектура платформы параллельного сбора, объяснены их реализация и проектные решения.Будут описаны конкретные реализации коллекций, такие как параллельные хэш-карты и параллельные хэш-попытки.Наконец, будут показаны несколько примеров приложений, демонстрирующих модель программирования на практике.

1 голос
/ 21 мая 2016

Просто чтобы прояснить объяснения тех, кто был до меня, теоретически идея состоит в том, что агрегат должен работать следующим образом (я изменил названия параметров, чтобы сделать их более понятными):

Seq(1,2,3,4).aggragate(0)(
     addToPrev = (prev,curr) => prev + curr, 
     combineSums = (sumA,sumB) => sumA + sumB)

Должен логическиtranslate to

Seq(1,2,3,4)
    .grouped(2) // split into groups of 2 members each
    .map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
    .foldLeft(0)(sumA,sumB => sumA + sumB)

Поскольку агрегация и отображение разделены, исходный список может быть теоретически разделен на разные группы разных размеров и работать параллельно или даже на другой машине.На практике текущая реализация scala не поддерживает эту функцию по умолчанию, но вы можете сделать это в своем собственном коде.

1 голос
/ 03 августа 2011

Определение aggregate в TraversableOnce источнике:

def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
  foldLeft(z)(seqop)

, которое ничем не отличается от простого foldLeft.combop, кажется, нигде не используется.Я сам не понимаю, какова цель этого метода.

...