Я хотел использовать foreachPartition на фрейме данных, чтобы отправлять данные каждой строки ТОЛЬКО один раз в REST API.
val aDF= ... ///sc.parallelize(0 to 1000000,4) i.e a dataframe ~1M rows
aDF.foreachPartition(rows => {
val hbaseConn : Connection = ... // SET UP DB conn ==> some settings for Connection Config returns a HBaseConnection
val httpClient : CloseableHttpClient = RESTUtil.getRestClient(...) //SET UP a http client
val auditHbaseTable : HBaseUtil = new HBaseUtil(hbaseConn, ...) //HBaseUtil is for DB table interactions
val httpPost = new HttpPost("some_URL")
httpPost.setHeader("Content-type", "application/json")
rows.foreach( row => {
try {
val payload = createPayload(row)
val entity = new StringEntity(JsonSerializationUtil.toJson(payload))
httpPost.setEntity(entity)
val response = httpClient.execute(httpPost)
val responseStr = EntityUtils.toString(response.getEntity)
val rowKey = UUID.randomUUID.toString
auditHbaseTable.hbaseRecordWriter(rowKey, responseStr)
}
catch{
case unknown => logger.info("Exception:" + unknown)
}
})
})
Это приводит к повторным отправкам, скорее всего, из-за неудачной повторной попытки задачи.Я отслеживаю отправленные полезные данные в таблице hbase для диагностики.Здесь, в таблице hbase аудита, я вижу, что для некоторых полезных нагрузок делается несколько записей для одной полезной нагрузки.Существуют повторные вставки для уже обработанной полезной нагрузки, которая совпадает с метками времени повторных задач.
Мне нужны предложения (с кодами и ссылками, если возможно), чтобы гарантировать, что искра запускается только один раз для каждой строки / полезной нагрузки.
Один из превентивных способов, который я мог придумать, - это создать UUID при запуске и продолжать проверять в HBASE, если он был отправлен ранее, но он сталкивается с проблемами производительности.
val generateUUID=udf(()=> UUID.randomUUID().toString)
val aDF2=aDF.withColumn("rowKey",generateUUID())
aDF2.foreachPartition(rows => {
...
rows.foreach( row => {
try {
if (auditHbaseTable.hbaseReader(rowKey).isEmpty){
...
}
}
...
})
})
Итак, кто-то может мне помочьс лучшим способом сделать это?Является ли 'foreachPartiton' правильным способом для этого?
Я также смотрю на фильтры Hbase Bloom, чтобы ускорить вышеописанные реализации, но я не знаю, как их использовать.Любые предложения для этого также приветствуются.