Это сообщение об ошибке указывает, что:
- В рамках потока вы отправляете сообщение контрагенту
- Контрагент зарегистрировал поток ответов, чтобы ответить на поток, отправляющий сообщение, но он не ожидает получения сообщения в это время
Есть несколько причин, по которым это может произойти. Поток ответов, возможно, уже закончился, или он может не ожидать получения сообщения в это время.
Вы говорите, что сообщение об ошибке появляется только во второй раз. Учитывая детали, которые у меня есть, я бы предположил, что ваш исходный поток пытается заставить контрагента несколько раз вызывать поток ответов. Что-то вроде:
@InitiatingFlow
@StartableByRPC
class InitiatorFlow(val counterparty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val counterpartySession = initiateFlow(counterparty)
(0..99).forEach {
counterpartySession.send("My payload.")
}
}
}
@InitiatedBy(InitiatorFlow::class)
class ResponderFlow(val counterpartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
counterpartySession.receive<String>()
}
}
Это не будет работать. Когда вы запускаете исходный поток, ему присваивается идентификатор потока. Этот идентификатор используется при общении с контрагентами, чтобы сообщить им, какой поток ответов использовать:
- Если контрагент никогда не получал сообщение из потока с этим идентификатором, он создаст новый экземпляр потока ответов
- Если контрагент ранее получал сообщение из потока с этим идентификатором, он продолжит использовать этот экземпляр
- Если этот экземпляр потока завершил работу, он не создаст новый экземпляр, но предположит, что произошла ошибка, с сообщением об ошибке, которое вы видите выше
Вместо этого вам нужно будет использовать подпотоки для создания нового идентификатора потока для каждого взаимодействия с контрагентом. Что-то вроде:
@InitiatingFlow
@StartableByRPC
class InitiatorFlow(val counterparty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
(0..99).forEach {
subFlow(SendMessageFlow(counterparty))
}
}
}
@InitiatingFlow
class SendMessageFlow(val counterparty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val counterpartySession = initiateFlow(counterparty)
counterpartySession.send("My payload.")
}
}
@InitiatedBy(SendMessageFlow::class)
class ResponderFlow(val counterpartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
counterpartySession.receive<String>()
}
}