Идиоматический способ обработки нескольких одновременных потоков в Scala - PullRequest
0 голосов
/ 28 мая 2018

У меня есть список потоков, которые после вызова их next() будут спать в произвольном количестве времени и затем читать один символ из другого источника.

Я пытаюсь написать потребителя (ей), чтобудет продолжать вызывать эти потоки до EOF и создавать общий словарь этих потоков во время выполнения.

Пока что я использую ConcurrentHashMap для словаря и просто создаю новый поток для каждого из потребителей потока.

, хотя мое решение работает, оно кажется очень наивным, и мне интересно, есть ли лучшее применение для потоковой библиотеки, такой как monix или fs2

1 Ответ

0 голосов
/ 29 мая 2018

Исходя из описания вопроса и последующих комментариев, я предполагаю, что существует несколько Iterator[Char] источников:

val allSources : Iterable[Iterator[Char]] = ???

И вопрос заключается в следующем: как одновременно собрать String значения из этихИтераторы для формирования отображения String для подсчета.

Потоковое решение

Сначала нам нужно преобразовать каждого из итераторов в итератор значений String на основе разделителя.:

trait Word {
  val data : String
}

object EmptyWord extends Word {
  override val data = ""
}

case class PartialWord(val data : String) extends Word

case class WholeWord(val data : String) extends Word

val appendToWord : Char => (Word, Char) => Word = 
  (separator) => (originalWord, appendChar) => originalWord match {
    case PartialWord(d) => 
      if(appendChar == separator)
        WholeWord(d)
      else
        PartialWord(d + appendChar)
    case _ => PartialWord(appendChar.toString)
  }

val isWholeWord : Word => Boolean = (_ : Word) match {
  case _ : WholeWord => true
  case _             => false
}

//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] = 
  (_ : Iterator[Char])
    .scanLeft(EmptyWord)(appendToWord(' '))
    .filter(isWholeWord)
    .map(_.data)

Теперь мы можем преобразовать все итераторы для генерации строк и объединить все итераторы в один итератор:

val allWordSource : Iterator[String] = 
  allSources.map(convertCharIterator)
            .reduceOption( _ ++ _)
            .getOrElse(Iterator.empty[String])

Теперь этот итератор может быть источникомпотока akka, который вычислит ваш счет:

val addToCounter : (Map[String, Int], String) => Map[String, Int] = 
  (counter, word) => 
    counter.updated(word, counter.getOrElse(word, 0) + 1)

val counter : Future[Map[String, Int]] = 
  Source
    .fromIterator( () => allWordSource)
    .runFold(Map.empty[String, Int])(addToCounter)
...