Инициирование сеанса потока из потока, помеченного с помощью InitiatedBy, к потоку, который также является InitiatedBy - PullRequest
0 голосов
/ 23 ноября 2018

Можно ли инициировать сеанс потока из потока, который помечен с помощью InitiatedBy, к потоку, который также помечен с помощью InitiatedBy?

Например:

@InitiatingFlow Class FlowA

@InitiatedBy(FlowA.class) Class FlowB

@InitiatedBy(FlowB.class) Class FlowC

Возможно ли достичь последовательности сеансов потока, например: A-> B-> C?

1 Ответ

0 голосов
/ 26 ноября 2018

Да, это возможно следующим образом:

@InitiatingFlow
@StartableByRPC
class Initiator(val firstCounterparty: Party, val secondCounterparty: Party) : FlowLogic<Int>() {
    override val progressTracker = ProgressTracker()

    @Suspendable
    override fun call(): Int {
        val flowSession = initiateFlow(firstCounterparty)
        flowSession.send(secondCounterparty)
        return flowSession.receive<Int>().unwrap { it }
    }
}

@InitiatingFlow
@InitiatedBy(Initiator::class)
class Responder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val secondCounterparty = flowSession.receive<Party>().unwrap { it }
        val newFlowSession = initiateFlow(secondCounterparty)
        val int = newFlowSession.receive<Int>().unwrap { it }
        flowSession.send(int)
    }
}

@InitiatingFlow
@InitiatedBy(Responder::class)
class ResponderResponder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        flowSession.send(3)
    }
}

Однако есть одно важное предостережение. В Corda 3.x не может быть двух FlowSessionс тем же контрагентом в том же потоке.Так что либо вам нужно запретить случай, когда A -> B -> A, следующим образом:

@InitiatingFlow
@StartableByRPC
class Initiator(val firstCounterparty: Party, val secondCounterparty: Party) : FlowLogic<Int>() {
    override val progressTracker = ProgressTracker()

    @Suspendable
    override fun call(): Int {
        if (secondCounterparty == ourIdentity) {
            throw FlowException("In Corda 3.x, you can't have two flow sessions with the same party.")
        }

        val flowSession = initiateFlow(firstCounterparty)
        flowSession.send(secondCounterparty)
        return flowSession.receive<Int>().unwrap { it }
    }
}

@InitiatingFlow
@InitiatedBy(Initiator::class)
class Responder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val secondCounterparty = flowSession.receive<Party>().unwrap { it }
        if (secondCounterparty == flowSession.counterparty) {
            throw FlowException("In Corda 3.x, you can't have two flow sessions with the same party.")
        }

        val newFlowSession = initiateFlow(secondCounterparty)
        val int = newFlowSession.receive<Int>().unwrap { it }
        flowSession.send(int)
    }
}

@InitiatingFlow
@InitiatedBy(Responder::class)
class ResponderResponder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        flowSession.send(3)
    }
}

, либо вам нужно перейти в подпоток InitiatingFlow в Responder перед запуском потока, который начинаетсяResponderResponder, следующим образом:

@InitiatingFlow
@StartableByRPC
class Initiator(val firstCounterparty: Party, val secondCounterparty: Party) : FlowLogic<Int>() {
    override val progressTracker = ProgressTracker()

    @Suspendable
    override fun call(): Int {
        val flowSession = initiateFlow(firstCounterparty)
        flowSession.send(secondCounterparty)
        return flowSession.receive<Int>().unwrap { it }
    }
}

@InitiatingFlow
@InitiatedBy(Initiator::class)
class Responder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val secondCounterparty = flowSession.receive<Party>().unwrap { it }
        val int = subFlow(ResponderInitiator(secondCounterparty))
        flowSession.send(int)
    }
}

@InitiatingFlow
class ResponderInitiator(val counterparty: Party) : FlowLogic<Int>() {
    @Suspendable
    override fun call(): Int {
        val flowSession = initiateFlow(counterparty)
        return flowSession.receive<Int>().unwrap { it }
    }
}

@InitiatingFlow
@InitiatedBy(ResponderInitiator::class)
class ResponderResponder(val flowSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        flowSession.send(3)
    }
}
...