Использование продолжений scala со слушателями netty / NIO - PullRequest
9 голосов
/ 08 февраля 2012

Я использую библиотеку Netty (версия 4 от GitHub). Это прекрасно работает в Scala, но я надеюсь, что моя библиотека сможет использовать стиль передачи продолжения для асинхронного ожидания.

Традиционно с Netty вы делаете что-то вроде этого (пример асинхронной операции подключения):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

Если вы реализуете библиотеку (которой я являюсь), то у вас в основном есть три простых варианта, позволяющих пользователю библиотеки делать вещи после установления соединения:

  1. Просто верните ChannelFuture из вашего метода connect и позвольте пользователю справиться с ним - это не дает большой абстракции от netty.
  2. Возьмите ChannelFutureListener в качестве параметра вашего метода соединения и добавьте его в качестве прослушивателя в ChannelFuture.
  3. Возьмите объект функции обратного вызова в качестве параметра вашего метода connect и вызовите его из созданного вами ChannelFutureListener (это будет делать стиль, управляемый обратным вызовом, похожий на node.js)

То, что я пытаюсь сделать, это четвертый вариант; Я не включил его в число выше, потому что это не просто.

Я хочу использовать продолжения с разделителями scala, чтобы сделать использование библиотеки чем-то похожим на блокирующую библиотеку, но она будет неблокируемой за кулисами:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

Представьте, что другие операции чтения / записи выполняются таким же образом. Цель этого в том, чтобы код пользователя мог выглядеть примерно так:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

Другими словами, программа будет выглядеть как простая программа в стиле блокировки, но за кулисами не будет никаких блокировок или потоков.

Проблема, с которой я сталкиваюсь, заключается в том, что я не до конца понимаю, как работает набор продолжений с разделителями. Когда я пытаюсь реализовать его описанным выше способом, компилятор жалуется, что моя реализация operationComplete фактически возвращает Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] вместо Unit. Я понял, что в CPS Scala есть своего рода «уловка» в том, что вы должны аннотировать тип возврата метода shift с помощью @suspendable, который передается по стеку вызовов до reset, но это не похоже быть любым способом согласовать это с уже существующей библиотекой Java, которая не имеет понятия продолжения с разделителями.

Я чувствую, что действительно должен быть способ обойти это - если Swarm может сериализовать продолжения и подключать их по сети для вычисления в другом месте, тогда должна быть возможность просто вызвать продолжение из уже существующего Java-класса. Но я не могу понять, как это можно сделать. Должен ли я переписать все части netty в Scala, чтобы это произошло?

1 Ответ

4 голосов
/ 08 февраля 2012

Я нашел это объяснение продолжений Scala чрезвычайно полезным, когда только начинал.В частности обратите внимание на части, где он объясняет shift[A, B, C] и reset[B, C].Добавление фиктивного null в качестве последнего утверждения operationComplete должно помочь.

Кстати, вам нужно вызвать retrn() внутри другого reset, если внутри него может быть вложено shift.

Редактировать: Вот рабочий пример

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

с возможным выводом:

Outside reset
operationComplete starts
This will happen after the connection is finished
...