Как вызвать подпоток в FlowResponder - PullRequest
0 голосов
/ 15 января 2019

У меня проблема с потоком вызовов в FlowResponder.

Сценарий объяснения

Как видите, мне нужно вызывать подпоток для вставки / обновления только для NodeB. Могу я это сделать?

Здесь, когда я пытаюсь, я получил эту ошибку

java.io.IOException: полезная нагрузка недействительна at net.corda.node.services.statemachine.FlowStateMachineImplKt.checkPayloadIs (FlowStateMachineImpl.kt: 605) ~ [corda-node-3.3-corda.jar :?] at net.corda.node.services.statemachine.FlowStateMachineImpl.receive (FlowStateMachineImpl.kt: 231) ~ [corda-node-3.3-corda.jar :?] в net.corda.node.services.statemachine.FlowSessionImpl.receive (FlowSessionImpl.kt: 44) ~ [corda-node-3.3-corda.jar :?] в net.corda.node.services.statemachine.FlowSessionImpl.receive (FlowSessionImpl.kt: 48) ~ [corda-node-3.3-corda.jar :?] at net.corda.core.flows.ReceiveTransactionFlow.call (ReceiveTransactionFlow.kt: 74) ~ [corda-core-3.3-corda.jar :?] at net.corda.core.flows.ReceiveTransactionFlow.call (ReceiveTransactionFlow.kt: 24) ~ [corda-core-3.3-corda.jar :?] at net.corda.core.flows.FlowLogic.subFlow (FlowLogic.kt: 290) ~ [corda-core-3.3-corda.jar :?] at net.corda.core.flows.SignTransactionFlow.call (CollectSignaturesFlow.kt: 212) ~ [corda-core-3.3-corda.jar :?] at net.corda.core.flows.SignTransactionFlow.call (CollectSignaturesFlow.kt: 197) ~ [corda-core-3.3-corda.jar :?] at net.corda.core.flows.FlowLogic.subFlow (FlowLogic.kt: 290) ~ [corda-core-3.3-corda.jar :?] в th.co.jventures.ddlp.cordapp.flows.CustomerIssueFlowResponder.call (CustomerIssueFlow.kt: 196) ~ [classes / :?] в th.co.jventures.ddlp.cordapp.flows.CustomerIssueFlowResponder.call (CustomerIssueFlow.kt: 177) ~ [classes / :?] at net.corda.node.services.statemachine.FlowStateMachineImpl.run (FlowStateMachineImpl.kt: 96) [corda-node-3.3-corda.jar :?] at net.corda.node.services.statemachine.FlowStateMachineImpl.run (FlowStateMachineImpl.kt: 44) [corda-node-3.3-corda.jar :?] at co.paralleluniverse.fibers.Fiber.run1 (Fiber.java:1092) [quasar-core-0.7.9-jdk8.jar: 0.7.9] at co.paralleluniverse.fibers.Fiber.exec (Fiber.java:788) [quasar-core-0.7.9-jdk8.jar: 0.7.9] at co.paralleluniverse.fibers.RunnableFiberTask.doExec (RunnableFiberTask.java:100) [quasar-core-0.7.9-jdk8.jar: 0.7.9] at co.paralleluniverse.fibers.RunnableFiberTask.run (RunnableFiberTask.java:91) [quasar-core-0.7.9-jdk8.jar: 0.7.9] в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [?: 1.8.0_181] на java.util.concurrent.FutureTask.run $$$ capture (FutureTask.java:266) [?: 1.8.0_181] в java.util.concurrent.FutureTask.run (FutureTask.java) [?: 1.8.0_181] в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) [?: 1.8.0_181] в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [?: 1.8.0_181] в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [?: 1.8.0_181] в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) [?: 1.8.0_181] at net.corda.node.utilities.AffinityExecutor $ ServiceAffinityExecutor $ 1 $ thread $ 1.run (AffinityExecutor.kt: 62) [corda-node-3.3-corda.jar :?] Вызвано: java.io.NotSerializableException: описанный тип с дескриптором net.corda: Rd6hxg + 0oJROxKDXK8OerA == ожидалось, что он относится к классу типа net.corda.core.transactions.SignedTransaction, но был java.util.List at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput.readObject $ node_api (DeserializationInput.kt: 133) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput.readObjectOrNull $ node_api (DeserializationInput.kt: 109) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput.readObjectOrNull $ node_api $ default (DeserializationInput.kt: 108) ~ [corda-node-api-3.3-corda.jar :?]at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput $ deserialize $ 1.invoke (DeserializationInput.kt: 98) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput.des (DeserializationInput.kt: 80) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.amqp.DeserializationInput.deserialize (DeserializationInput.kt: 96) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme.deserialize (AMQPSerializationScheme.kt: 123) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.SerializationFactoryImpl $ десериализуют $ 1 $ 1.invoke (SerializationScheme.kt: 111) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.core.serialization.SerializationFactory.withCurrentContext (SerializationAPI.kt: 66) ~ [corda-core-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.SerializationFactoryImpl $ deserialize $ 1.invoke (SerializationScheme.kt: 111) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.SerializationFactoryImpl $ deserialize $ 1.invoke (SerializationScheme.kt: 86) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.core.serialization.SerializationFactory.asCurrent (SerializationAPI.kt: 80) ~ [corda-core-3.3-corda.jar :?] at net.corda.nodeapi.internal.serialization.SerializationFactoryImpl.deserialize (SerializationScheme.kt: 111) ~ [corda-node-api-3.3-corda.jar :?] at net.corda.node.services.statemachine.FlowStateMachineImplKt.checkPayloadIs (FlowStateMachineImpl.kt: 603) ~ [corda-node-3.3-corda.jar:?]

Здесь инициируйте код потока

package th.co.jventures.ddlp.cordapp.flows

import co.paralleluniverse.fibers.Suspendable
import net.corda.confidential.IdentitySyncFlow
import net.corda.core.contracts.Command
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.contracts.requireThat
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap
import th.co.jventures.ddlp.cordapp.contracts.CustomerContract
import th.co.jventures.ddlp.cordapp.core.commons.CordappConstants
import th.co.jventures.ddlp.cordapp.dto.CustomerIssueData
import th.co.jventures.ddlp.cordapp.services.CustomerService
import th.co.jventures.ddlp.cordapp.states.Address
import th.co.jventures.ddlp.cordapp.states.Consent
import th.co.jventures.ddlp.cordapp.states.CustomerState
import th.co.jventures.ddlp.cordapp.utils.CordappValidationUtils.Companion.requiredNotWhen
import java.util.*

@InitiatingFlow
@StartableByRPC
class CustomerIssueFlow(private val customerIssueData: CustomerIssueData) : FlowLogic<String>() {

    companion object {
        object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on new IOU.")
        object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.")
        object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.")
        object GATHERING_SIGS : ProgressTracker.Step("Gathering the counterparty's signature.") {
            override fun childProgressTracker() = CollectSignaturesFlow.tracker()
        }

        object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") {
            override fun childProgressTracker() = FinalityFlow.tracker()
        }

        fun tracker() = ProgressTracker(
                GENERATING_TRANSACTION,
                VERIFYING_TRANSACTION,
                SIGNING_TRANSACTION,
                GATHERING_SIGS,
                FINALISING_TRANSACTION
        )
    }

    override val progressTracker = tracker()

    @Suspendable
    override fun call(): String {
        // Obtain a reference to the notary we want to use.
        val notary = serviceHub.networkMapCache.notaryIdentities[0]

        // Stage 1.
        progressTracker.currentStep = GENERATING_TRANSACTION
        // Generate an unsigned transaction.

        val customerService = serviceHub.cordaService(CustomerService::class.java)

        //#################################################################//
        val pahNode = serviceHub.networkMapCache.getPeerByLegalName(CordappConstants.Node.PAH) as Party
        val participants = listOf(ourIdentity, pahNode)

        val customerStateAndRef = customerService.findByIdCardNoAndPartner(idCarNo = customerIssueData.customerData.idCardNo!!, partner = customerIssueData.customerData.partner!!)

        //check duplicate customer
        requiredNotWhen(customerStateAndRef == null, "Customer already registered")

        val addresses = customerIssueData.customerData.addresses?.map { it ->
            Address(
                    seqId = it.seqId,
                    id = UUID.randomUUID().toString(),
                    type = it.type,
                    houseNumber = it.houseNumber,
                    street = it.street,
                    subDistrict = it.subDistrict,
                    district = it.district,
                    province = it.province,
                    postalCode = it.postalCode,
                    countryCode = it.countryCode,
                    mobile = it.mobile,
                    telephone = it.telephone
            )
        }

        val consentsArray = customerIssueData.customerData.consents?.map { it ->
            Consent(
                    id = UUID.randomUUID().toString(),
                    partner = it.partner,
                    product = it.product,
                    reference = it.reference,
                    createDate = it.createDate
            )
        }

        val linearId = UniqueIdentifier()

        val customerState = CustomerState(
                linearId = linearId,
                externalCustomerNo = customerIssueData.customerData.externalCustomerNo!!,
                title = customerIssueData.customerData.title!!,
                firstName = customerIssueData.customerData.firstName!!,
                middleName = customerIssueData.customerData.middleName!!,
                lastName = customerIssueData.customerData.lastName!!,
                idCardNo = customerIssueData.customerData.idCardNo!!,
                email = customerIssueData.customerData.email!!,
                partner = customerIssueData.customerData.partner!!,
                product = customerIssueData.customerData.product!!,
                consents = consentsArray,
                status = CordappConstants.CustomerStatus.Active,
                addresses = addresses,
                participants = participants,
                createDate = Date(),
                createBy = customerIssueData.customerData.username!!,
                changeDate = Date(),
                changeBy = customerIssueData.customerData.username!!
        )

        val txCommand = Command(CustomerContract.Commands.Issue(), customerState.participants.map { it.owningKey })
        val txBuilder = TransactionBuilder(notary)
                .addOutputState(customerState, CustomerContract.CONTRACT_ID)
                .addCommand(txCommand)

        //#################################################################//
        // Stage 2.
        progressTracker.currentStep = VERIFYING_TRANSACTION
        // Verify that the transaction is valid.
        txBuilder.verify(serviceHub)

        // Stage 3.
        progressTracker.currentStep = SIGNING_TRANSACTION
        // Sign the transaction.
        val partSignedTx = serviceHub.signInitialTransaction(txBuilder)

        // Stage 4.
        progressTracker.currentStep = GATHERING_SIGS
        // Send the state to the counterparty, and receive it back with their signature.
        val session = (customerState.participants - ourIdentity).map { initiateFlow(it) }
        val fullySignedTx = subFlow(CollectSignaturesFlow(partSignedTx, session, GATHERING_SIGS.childProgressTracker()))

        // Stage 5.
        progressTracker.currentStep = FINALISING_TRANSACTION
        // Notarise and record the transaction in both parties' vaults.

        subFlow(FinalityFlow(fullySignedTx, FINALISING_TRANSACTION.childProgressTracker()))
        return customerState.linearId.toString()
    }

}

@InitiatedBy(CustomerIssueFlow::class)
class CustomerIssueFlowResponder(val otherSideSession: FlowSession) : FlowLogic<SignedTransaction>() {// TwoPartyDealFlow.Acceptor(otherSideSession) {

    @Suspendable
    override fun call(): SignedTransaction {

        // val finalTx = super.call()
        println("######## START CustomerIssueFlowResponder ########################################################")
        println(ourIdentity)
        val signTransactionFlow = object : SignTransactionFlow(otherSideSession) {
            override fun checkTransaction(stx: SignedTransaction) = requireThat {
                // val output = stx.tx.outputs.single().data
                //"This must be an CustomerState." using (output is CustomerState)
            }
        }

        val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false))
        val customerState = stx.tx.outputs.single().data as CustomerState
        subFlow(CustomerPAHIssueOrUpdateFlow(customerState))
        println("######## END CustomerIssueFlowResponder ########################################################")

        return subFlow(signTransactionFlow)


    }
}

И подпоток

package th.co.jventures.ddlp.cordapp.flows

import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Command
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.contracts.requireThat
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.loggerFor
import th.co.jventures.ddlp.cordapp.contracts.CustomerContract
import th.co.jventures.ddlp.cordapp.core.commons.CordappConstants
import th.co.jventures.ddlp.cordapp.services.CustomerService
import th.co.jventures.ddlp.cordapp.states.CustomerState

@InitiatingFlow
@StartableByService
@StartableByRPC
class CustomerPAHIssueOrUpdateFlow(private val customerState: CustomerState) : FlowLogic<String>() {

    companion object {
        val log = loggerFor<CustomerPAHIssueOrUpdateFlow>()

        object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on new IOU.")
        object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.")
        object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.")
        object GATHERING_SIGS : ProgressTracker.Step("Gathering the counterparty's signature.") {
            override fun childProgressTracker() = CollectSignaturesFlow.tracker()
        }

        object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") {
            override fun childProgressTracker() = FinalityFlow.tracker()
        }

        fun tracker() = ProgressTracker(
                GENERATING_TRANSACTION,
                VERIFYING_TRANSACTION,
                SIGNING_TRANSACTION,
                GATHERING_SIGS,
                FINALISING_TRANSACTION
        )
    }

    override val progressTracker = tracker()

    @Suspendable
    override fun call(): String {
        // Obtain a reference to the notary we want to use.
        val notary = serviceHub.networkMapCache.notaryIdentities[0]

        // Stage 1.
        progressTracker.currentStep = GENERATING_TRANSACTION
        // Generate an unsigned transaction.

        val customerService = serviceHub.cordaService(CustomerService::class.java)

        //#################################################################//
        val pahNode = serviceHub.networkMapCache.getPeerByLegalName(CordappConstants.Node.PAH) as Party
        //val participants = listOf(ourIdentity, pahNode)

        val customerStateAndRef = customerService.findByIdCardNoAndPartner(idCarNo = customerState.idCardNo!!, partner = CordappConstants.PartnerNameList.PAH)

        val txBuilder = TransactionBuilder(notary)
        if (customerStateAndRef == null) {
            log.info("===================================================")
            val linearId = UniqueIdentifier()
            val customerOutputState = customerState.copy(linearId = linearId, participants = listOf(pahNode), partner = CordappConstants.PartnerNameList.PAH)
            val txCommand = Command(CustomerContract.Commands.Issue(), customerState.participants.map { it.owningKey })

            log.info("Create new JVC Customer : {}", customerOutputState)

            txBuilder.addOutputState(customerOutputState, CustomerContract.CONTRACT_ID)
                    .addCommand(txCommand)
            log.info("===================================================")
        } else {
            log.info("===================================================")
            val txCommand = Command(CustomerContract.Commands.Update(), customerState.participants.map { it.owningKey })
            val customerOutputState = customerState.copy(participants = listOf(pahNode), partner = CordappConstants.PartnerNameList.PAH)

            log.info("Update existing JVC Customer : {}", customerOutputState)

            txBuilder.addInputState(customerStateAndRef)
            txBuilder.addOutputState(customerOutputState, CustomerContract.CONTRACT_ID)
                    .addCommand(txCommand)
            log.info("===================================================")
        }

        //#################################################################//
        // Stage 2.
        progressTracker.currentStep = VERIFYING_TRANSACTION
        // Verify that the transaction is valid.
        txBuilder.verify(serviceHub)

        // Stage 3.
        progressTracker.currentStep = SIGNING_TRANSACTION
        // Sign the transaction.
        val partSignedTx = serviceHub.signInitialTransaction(txBuilder)

        // Stage 4.
        progressTracker.currentStep = GATHERING_SIGS
        // Send the state to the counterparty, and receive it back with their signature.
        val session = (customerState.participants - ourIdentity).map { initiateFlow(it) }
        val fullySignedTx = subFlow(CollectSignaturesFlow(partSignedTx, session, GATHERING_SIGS.childProgressTracker()))

        // Stage 5.
        progressTracker.currentStep = FINALISING_TRANSACTION
        // Notarise and record the transaction in both parties' vaults.

        /*For syndicate loan phase
        //Initial default wallet with 0.00 THB
        subFlow(WalletIssueFlow(WalletIssueData(
                amount = BigDecimal.ZERO,
                currency = DEFAULT_CURRENCY,
                customerId = linearId.id.toString()
                )))
        */

        subFlow(FinalityFlow(fullySignedTx, FINALISING_TRANSACTION.childProgressTracker()))
        return customerState.linearId.toString()
    }

}

@InitiatedBy(CustomerPAHIssueOrUpdateFlow::class)
class CustomerPAHIssueOrUpdateFlowResponder(val otherPartyFlow: FlowSession) : FlowLogic<SignedTransaction>() {
    @Suspendable
    override fun call(): SignedTransaction {
        val signTransactionFlow = object : SignTransactionFlow(otherPartyFlow) {
            override fun checkTransaction(stx: SignedTransaction) = requireThat {
                //val output = stx.tx.outputs.single().data
               // "This must be an CustomerState." using (output is CustomerState)
            }
        }

        return subFlow(signTransactionFlow)
    }
}

1 Ответ

0 голосов
/ 15 января 2019

Эта проблема не связана с вызовом подпотока из потока. Вместо этого проблема находится в этой строке в CustomerIssueFlowResponder:

val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false))

Нет соответствующей отправки транзакции с другой стороны потока, что вызывает исключение типа:

java.io.NotSerializableException: описанный тип с дескриптором net.corda: ожидается, что Rd6hxg + 0oJROxKDXK8OerA == будет иметь класс типа net.corda.core.transactions.SignedTransaction, но был java.util.List

Вместо вызова ReceiveTransactionFlow просто используйте транзакцию, возвращаемую SignTransactionFlow.

...