Преобразование метода Scala @suspendable в будущее - PullRequest
12 голосов
/ 03 сентября 2011

предположим, у меня есть функция сна:

def sleep(delay:Int) : Unit @suspendable = {
  ....
}

возможно ли иметь функцию future, которая создает асинхронную версию функции сна, которую можно ожидать синхронно.

def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = {
    ....
}

class Future {
  def await : Unit @suspendable = {
     ....
  }
}

вы должны быть в состоянии сделать что-то вроде этого:

reset {
  val sleepAsync = future(sleep)
  val future1 = sleepAsync(2000)
  val future2 = sleepAsync(3000)
  future1.await
  future2.await
  /* finishes after a delay of 3000 */
}

два вызова sleepAsync должны сразу же вернуться, и два вызова Future # await должны блокироваться.конечно, все они действительно сбрасываются в конце сброса, и код после отвечает за вызов продолжения после задержки.

в противном случае есть ли альтернативный метод для параллельного запуска двух функций @suspendable и ожидания завершения обеих из них?

У меня есть компилируемая сущность со скелетом того, что я хочу сделать:https://gist.github.com/1191381

Ответы [ 2 ]

2 голосов
/ 20 января 2012
object Forks {

  import scala.util.continuations._

  case class Forker(forks: Vector[() => Unit @suspendable]) {
    def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block))
    def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) =>
      val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size)
      forks foreach { f =>
        reset {
          f()
          if (pred(counter.decrementAndGet)) k()
        }
      }
    }
    def joinAll() = joinIf(_ == 0)
    def joinAny() = joinIf(_ == forks.size - 1)
  }

  def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block))
}

используя fork (), теперь мы можем подождать много «приостановок».используйте ~ (), чтобы связать воедино суспендируемые элементы.используйте joinAll () для ожидания всех приостановок и joinAny () для ожидания только одного.используйте joinIf () для настройки стратегии соединения.

object Tests extends App {

  import java.util.{Timer, TimerTask}
  import scala.util.continuations._

  implicit val timer = new Timer

  def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = {
    shift { k: (Unit => Unit) =>
      timer.schedule(new TimerTask {
        def run = k()
      }, ms)
    }
  }

  import Forks._

  reset {
    fork {
      println("sleeping for 2000 ms")
      sleep(2000)
      println("slept for 2000 ms")
    } ~ {
      println("sleeping for 4000 ms")
      sleep(4000)
      println("slept for 4000 ms")
    } joinAll()
    println("and we are done")
  }
  println("outside reset")
  readLine
  timer.cancel
}

, и это вывод.Программа запускается в момент времени T:

sleeping for 2000 ms
sleeping for 4000 ms
outside reset         <<<<<< T + 0 second
slept for 2000 ms     <<<<<< T + 2 seconds
slept for 4000 ms     <<<<<< T + 4 seconds
and we are done       <<<<<< T + 4 seconds
1 голос
/ 20 января 2012

Я не уверен, что полностью понимаю вопрос, но вот попытка:

import scala.util.continuations._

class Future(thread: Thread) {
  def await = thread.join
}

object Future {

  def sleep(delay: Long) = Thread.sleep(delay)

  def future[A,B](f: A => B) = (a: A) => shift { k: (Future => Unit) =>
    val thread = new Thread { override def run() { f(a) } }
    thread.start()

    k(new Future(thread))
  }

  def main(args:Array[String]) = reset {
    val sleepAsync = future(sleep)
    val future1 = sleepAsync(2000) // returns right away
    val future2 = sleepAsync(3000) // returns right away
    future1.await // returns after two seconds
    future2.await // returns after an additional one second
    // finished after a total delay of three seconds
  }
}

Здесь экземпляр Future является не чем иным, как дескриптором Thread, поэтому вы можете использовать его метод join для блокировки до его завершения.

Функция future принимает функцию типа A => B и возвращает функцию, которая при наличии A запускает поток для запуска функции "futured" и заключает ее в Future, который вводится обратно в продолжение, тем самым присваивая его val future1.

Это где-нибудь близко к тому, что вы собирались?

...