Можем ли мы определить набор операций DSL в Scala, которые выполняются параллельно друг с другом, как при использовании конвейерной обработки в Linux - PullRequest
5 голосов
/ 17 января 2012

Простите меня за плохой английский, но я постараюсь выразить свой вопрос.

Предположим, я хочу обработать большой текст, операция которого заключается в фильтрации содержимого по ключевому слову; измените их на строчные; а затем распечатать их на стандартный вывод. Как мы все знаем, мы можем сделать это с помощью конвейера в Linux BASH-скрипте:

cat article.txt | grep "I" | tr "I" "i" > /dev/stdout

, где cat article.txt, grep "I", tr "I" "i" > /dev/stdout работают параллельно.

В Scala мы, вероятно, делаем это так:

//or read from a text file , e.g. article.txt 
val strList = List("I", "am", "a" , "student", ".", "I", "come", "from", "China", ".","I","love","peace")  
strList.filter( _ == "I").map(_.toLowerCase).foreach(println)

Мой вопрос: как мы можем сделать filter, map и foreach параллельными?

ТНХ

Ответы [ 5 ]

2 голосов
/ 18 января 2012

Используйте представление:

val strList = List("I", "am", "a" , "student", ".", "I", "come", "from", "China", ".","I","love","peace")  // or read from a text file , e.g. article.txt 
strList.view.filter( _ == "I").map(_.toLowerCase).foreach(println)

Представления хранят операции над коллекциями (в данном случае filter и map) и выполняют их только при запросе от них элементов (foreach в этомдело).Поэтому сначала он применил фильтр и сопоставил бы с «Я», затем с «Я» и т. Д.

2 голосов
/ 17 января 2012
Решение

Цтеннера, вероятно, является наиболее эффективным решением в вашей ситуации, поскольку оно может обеспечить высокую степень параллелизма (теоретически каждая отдельная вещь может обрабатываться параллельно).Тем не менее, ваш пример bash просто использует конвейерный параллелизм, и этот вид параллелизма, к сожалению, не поддерживается напрямую параллельными коллекциями Scalas.

Для достижения конвейерного параллелизма ваши операторы (filter, map, foreach) должны выполняться разнымипотоки, например, с помощью Actors.

В общем, я думаю, что для Scala было бы неплохо иметь простой API для этого.Но для вашего примера я сомневаюсь, что параллелизм конвейера сильно ускорит ваше выполнение.Если вы просто используете очень простые операции фильтрации и отображения, я предполагаю, что накладные расходы на связь (для почтовых ящиков FIFO / Actor) потребляют полное ускорение вашего параллельного выполнения.

2 голосов
/ 17 января 2012

Если вы измените свой Список на Итератор, вы увидите, что фильтр / карта / foreach больше не группируются.

Попробуйте это:

val strList = Iterator("I", "am", "a" , "student", ".", "I", "come", "from", "China", ".","I","love","peace")  
strList.filter{ s => println("f"); s == "I"}.map{s => println("m"); s.toLowerCase}.foreach{s =>println("p")}

Вы увидите:
fmpfffffmpfffffmpff

Вместо: fffffffffffffmmmppp

Потому что, когда вы применяете преобразование к списку, он сразу же возвращает новый список.Но когда вы применяете преобразование к Итератору, оно будет запускаться только тогда, когда вы пройдете его (в данном случае foreach).

2 голосов
/ 17 января 2012

В 2.9 были добавлены параллельные коллекции. Чтобы распараллелить цикл, все, что вам нужно сделать, это преобразовать его, вызвав функцию-член par.

Ваш код будет выглядеть так:

val strList = List("I", "am", "a" , "student", ".", "I", "come", "from", "China", ".","I","love","peace")  // or read from a text file , e.g. article.txt 
strList.<b>par.</b>filter( _ == "I").map(_.toLowerCase).foreach(println)
0 голосов
/ 17 января 2012

Создайте функцию для одного аргумента из вашей цепочки функций. Затем примените эту функцию к параллельной коллекции. Обратите внимание, что println не будет вызываться в порядке исходной коллекции.

def fmp(xs: Seq[String]){
  xs.par.foreach{x => 
    for(
      kw <- Option(x).filter(_ == "I"); 
      lc <- kw.map(_.toLowerCase)
    ) println(lc)
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...