К сожалению, ваш вопрос сейчас неясен. При определенных допущениях то, что вы описываете, выглядит как то, что сейчас называется «FRP» или «Функциональное реактивное программирование» . Если вы хотите сделать это серьезно, вам, вероятно, следует взглянуть на некоторые зрелые библиотеки, такие как RxScala или Monix , которые обрабатывают многие важные в реальном мире детали, такие как ошибка обработка или планирование / создание потоков и многое другое.
Для простой задачи вы можете развернуть простую пользовательскую реализацию, подобную этой:
trait Observable {
def subscribe(subscriber: Subscriber): RxConnection
}
trait RxConnection {
def disconnect(): Unit
}
trait Subscriber {
def onChanged(): Unit
}
trait RxOut[T] extends Observable {
def currentValue: Option[T]
}
class MulticastObservable extends Observable with Subscriber {
private val subscribers: mutable.Set[Subscriber] = mutable.HashSet()
override def onChanged(): Unit = subscribers.foreach(s => s.onChanged())
override def subscribe(subscriber: Subscriber): RxConnection = {
subscribers.add(subscriber)
new RxConnection {
override def disconnect(): Unit = subscribers.remove(subscriber)
}
}
}
abstract class BaseRxOut[T](private var _lastValue: Option[T]) extends RxOut[T] {
private val multicast = new MulticastObservable()
protected def lastValue: Option[T] = _lastValue
protected def lastValue_=(value: Option[T]): Unit = {
_lastValue = value
multicast.onChanged()
}
override def currentValue: Option[T] = lastValue
override def subscribe(subscriber: Subscriber): RxConnection = multicast.subscribe(subscriber)
}
class RxValue[T](initValue: T) extends BaseRxOut[T](Some(initValue)) {
def value: T = this.lastValue.get
def value_=(value: T): Unit = {
this.lastValue = Some(value)
}
}
trait InputConnector[T] {
def connectInput(input: RxOut[T]): RxConnection
}
class InputConnectorImpl[T] extends BaseRxOut[T](None) with InputConnector[T] {
val inputHolder = new RxValue[Option[(RxOut[T], RxConnection)]](None)
private def updateValue(): Unit = {
lastValue = for {inputWithDisconnect <- inputHolder.value
value <- inputWithDisconnect._1.currentValue}
yield value
}
override def connectInput(input: RxOut[T]): RxConnection = {
val current = inputHolder.value
if (current.exists(iwd => iwd._1 == input))
current.get._2
else {
current.foreach(iwd => iwd._2.disconnect())
inputHolder.value = Some(input, input.subscribe(() => this.updateValue()))
updateValue()
new RxConnection {
override def disconnect(): Unit = {
if (inputHolder.value.exists(iwd => iwd._1 == input)) {
inputHolder.value.foreach(iwd => iwd._2.disconnect())
inputHolder.value = None
updateValue()
}
}
}
}
}
}
abstract class BaseRxCalculation[Out] extends BaseRxOut[Out](None) {
protected def registerConnectors(connectors: InputConnectorImpl[_]*): Unit = {
connectors.foreach(c => c.subscribe(() => this.recalculate()))
}
private def recalculate(): Unit = {
var newValue = calculateOutput()
if (newValue != lastValue) {
lastValue = newValue
}
}
protected def calculateOutput(): Option[Out]
}
case class RxCalculation1[In1, Out](func: Function1[In1, Out]) extends BaseRxCalculation[Out] {
private val conn1Impl = new InputConnectorImpl[In1]
def conn1: InputConnector[In1] = conn1Impl // show to the outer world only InputConnector
registerConnectors(conn1Impl)
override protected def calculateOutput(): Option[Out] = {
for {v1 <- conn1Impl.currentValue}
yield func(v1)
}
}
case class RxCalculation2[In1, In2, Out](func: Function2[In1, In2, Out]) extends BaseRxCalculation[Out] {
private val conn1Impl = new InputConnectorImpl[In1]
def conn1: InputConnector[In1] = conn1Impl // show to the outer world only InputConnector
private val conn2Impl = new InputConnectorImpl[In2]
def conn2: InputConnector[In2] = conn2Impl // show to the outer world only InputConnector
registerConnectors(conn1Impl, conn2Impl)
override protected def calculateOutput(): Option[Out] = {
for {v1 <- conn1Impl.currentValue
v2 <- conn2Impl.currentValue}
yield func(v1, v2)
}
}
// add more RxCalculationN if needed
И вы можете использовать это так:
def test(): Unit = {
val pipe2 = new RxCalculation1((in: Double) => {
println(s"in = $in")
val vel = 3.4
val V = 300
val a = 10.2
val TotV = V + in
TotV * a / vel
})
val in1 = new RxValue(2.0)
println(pipe2.currentValue)
val conn1 = pipe2.conn1.connectInput(in1)
println(pipe2.currentValue)
in1.value = 3.0
println(pipe2.currentValue)
conn1.disconnect()
println(pipe2.currentValue)
}
который печатает
нет
Отсутствует
in = 2.0
Некоторые (905.9999999999999)
in = 3.0
Немного (909,0)
Нет
Здесь ваш «канал» - это RxCalculation1
(или другой RxCalculationN
), который оборачивает функцию, и вы можете «подключать» и «отключать» другие «каналы» или просто «значения» к различным входам и запускать цепочку обновления.