Scala поток и выпуск ExecutionContext - PullRequest
0 голосов
/ 07 января 2020

Я новичок в Scala, и у меня возникло несколько проблем в моем назначении: я хочу создать потоковый класс, который может выполнять 3 основные задачи: filter, map и forEach. Мой поток s data is an array of elements. Each of the 3 main tasks should run in 2 different threads on my stream с массивом. Кроме того, мне нужно разделить лог c действия и его фактический запуск на две разные части. Сначала объявляйте все задачи в потоке, и только когда я запускаю stream.run(), я хочу, чтобы фактические действия происходили.

Мой код:

class LearningStream[A]() {
  val es: ExecutorService = Executors.newFixedThreadPool(2)
  val ec = ExecutionContext.fromExecutorService(es)
  var streamValues: ArrayBuffer[A] = ArrayBuffer[A]()
  var r: Runnable = () => "";

  def setValues(streamv: ArrayBuffer[A]) = {
    streamValues = streamv;
  }

  def filter(p: A => Boolean): LearningStream[A] = {
    var ls_filtered: LearningStream[A] = new LearningStream[A]()
    r = () => {
      println("running real filter..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[A]=es.submit(()=>l.filter(p)).get()
      val b:ArrayBuffer[A]=es.submit(()=>r.filter(p)).get()
      ms_filtered.setValues(a++b)
    }
    return ls_filtered
  }

  def map[B](f: A => B): LearningStream[B] = {
    var ls_map: LearningStream[B] = new LearningStream[B]()
    r = () => {
      println("running real map..")
      val (l,r) = streamValues.splitAt(streamValues.length/2)
      val a:ArrayBuffer[B]=es.submit(()=>l.map(f)).get()
      val b:ArrayBuffer[B]=es.submit(()=>r.map(f)).get()
      ls_map.setValues(a++b)
    }
    return ls_map
  }

  def forEach(c: A => Unit): Unit = {
    r=()=>{
      println("running real forEach")
      streamValues.foreach(c)}
  }

   def insert(a: A): Unit = {
    streamValues += a
  }

  def start(): Unit = {
    ec.submit(r)
  }

   def shutdown(): Unit = {
    ec.shutdown()
  }
}

мой основной:

def main(args: Array[String]): Unit = {
    var factorial=0
    val s = new LearningStream[String]
    s.filter(str=>str.startsWith("-")).map(s=>s.toInt*(-1)).forEach(i=>factorial=factorial*i)

    for(i <- -5 to 5){
      s.insert(i.toString)
    }
    println(s.streamValues)
    s.start()
    println(factorial)
    }

Основное печатает только выходные данные фильтра, а факториал не изменяется (по-прежнему 1). Что мне здесь не хватает?

1 Ответ

0 голосов
/ 10 января 2020

Мое решение: @ Леви Рэмси оставил несколько хороших подсказок в комментариях, если вы хотите получить подсказки, а не реальное решение.

Первая проблема: только одна команда (фильтр) была выполнена, а другая - нет. решение: вставьте в исполняемый файл каждой команды вызов для следующего потока через:

ec.submit(ms_map.r)

Чтобы иметь возможность закрыть все сеансы, нам нужно добавить еще один элемент данных LearningStream в класс. Однако мы не можем добавить только обычный объект LearningStream, поскольку он зависит от параметра [A]. Следовательно. Я реализовал черту, которая имеет функцию закрытия, и мой член данных имел этот тип черты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...