В идеале должен быть генератор заглушки, который генерирует соответствующие вызовы как suspend
/ Flow
методы, но вы все равно можете абстрагировать большую часть преобразования с помощью специальной вспомогательной функции:
fun <T> grpcFlow(
@BuilderInference block: suspend (StreamObserver<T>) -> Unit
): Flow<T> = callbackFlow {
// Use ClientCallStreamObserver to support cancellation
val observer = object : ClientCallStreamObserver<T>() {
override fun onNext(value: T) {
sendBlocking(value)
}
override fun onError(t: Throwable) {
cancel(CancellationException("GRPC Error", t))
}
override fun onCompleted() = channel.close()
}
block(observer)
awaitClose {
observer.cancel("flow cancellation", null)
}
}
Тогда ваш API просто становится:
class API {
val nonBlockingStub: HealthcareAPIGrpc.HealthcareAPIStub
suspend fun exampleRequest(params: Params) = grpcFlow {
// @BuilderInference should ensure the correct type is used
nonBlockingStub.exampleRequest(params, it)
}.single() // Since we only expect a single response value.
// And for returns (stream ReturnedResult)
suspend fun exampleStreamingRequest(params: Params) = gcpcFlow {
nonBlockingStub.exampleStreamingRequest(params, it)
} // Flow<ReturnedResult>
}