В приложении оно будет извлекать данные из нескольких конечных точек для создания объединенного потока для вызывающей стороны, поэтому для конечной точки будет несколько FetchRemoteDataCommand
один.как
val fetchers = getFetchers (...) //returns ArrayList<FetchRemoteDataCommand> with different endpoints
for (f in fetchers) {
f.doPost()
}
В настоящее время он не использует сопрограммы, но в зависимости от обратного вызова onDataFetchComplete
для передачи результата обратно вызывающей стороне.И вызывающий должен управлять состоянием каждого сборщика, чтобы, наконец, объединить все результаты каждого сборщика.
Возможно, сопрограммы помогут в этом.
Как это сделать с сопрограммамичто ждут, пока все сборщики завершат свой вызов дооснащения, а затем построят агрегированный поток из результата всех сборщиков?
класс FetchRemoteDataCommand
class FetchRemoteDataCommand<T>(val baseUrl: String,
val path: String,
var headers: HashMap<String, String>,
var params: HashMap<String, String>,
val body: okhttp3.RequestBody? = null,
callback: ICallback<IData<T>>,
var httpClient: OkHttpClient,
POJOClassType: Class<T>) {
private var fetchCompleteCallback: ICallback<IGenericData<T>> = callback
val mPOJOClazz: Class<T> = POJOClassType
var resultData: T? = null
fun getResult(): T? {
return resultData
}
suspend fun doPost() {
val service = createRetrofitService(IRemoteDataRequest::class.java, baseUrl, httpClient)
service.postForData(path, headers, params, body!!).enqueue(object : Callback<ResponseBody> {
override fun onResponse(call: Call<ResponseBody>, response: Response<ResponseBody>) {
try {
val jsonString: String = response.body()?.string() ?: ""
resultData = try {
Gson().fromJson<T>(jsonString, mPOJOClazz)
} catch (e: Exception) {
null
}
val headerList = response.headers()
val errorBodyString = response.errorBody()?.string()
val success = response.isSuccessful
val contentType = response.body()?.contentType()
val code = response.code() ?: -1
var message = response.message() ?: ""
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).execute {
fetchCompleteCallback.onDataFetchComplete(resultData, success, code, message, errorBodyString, contentType, headerList)
}
} finally {
response.body()?.close()
}
}
override fun onFailure(call: Call<ResponseBody>, t: Throwable) {
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()).execute {
fetchCompleteCallback.onDataFetchComplete(null, false, -1, t.message ?: "", null, null, null)
}
}
})
}
fun <T> createRetrofitService(clazz: Class<T>, endPoint: String, httpClient: OkHttpClient): T {
val restAdapter = Retrofit.Builder()
.baseUrl(endPoint)
.addConverterFactory(GsonConverterFactory.create())
.client(httpClient)
.build()
return restAdapter.create(clazz)
}
}
interface IRemoteDataRequest {
@POST
fun postForData(@Url url: String,
@HeaderMap headers: Map<String, String>,
@QueryMap params: Map<String, String>?, @Body body: RequestBody): Call<ResponseBody>
}