Как гарантировать процесс только один раз для вызовов REST API для всех записей кадра данных SPARK - PullRequest
1 голос
/ 22 мая 2019

Я хотел использовать foreachPartition на фрейме данных, чтобы отправлять данные каждой строки ТОЛЬКО один раз в REST API.

val aDF= ... ///sc.parallelize(0 to 1000000,4) i.e a dataframe ~1M rows
  aDF.foreachPartition(rows => {
    val hbaseConn : Connection =  ... // SET UP DB conn ==> some settings for Connection Config returns a HBaseConnection
    val httpClient : CloseableHttpClient = RESTUtil.getRestClient(...) //SET UP a http client 
    val auditHbaseTable : HBaseUtil = new HBaseUtil(hbaseConn, ...) //HBaseUtil is for DB table interactions

    val httpPost = new HttpPost("some_URL")
    httpPost.setHeader("Content-type", "application/json")

    rows.foreach( row => {
      try {
        val payload = createPayload(row)
        val entity = new StringEntity(JsonSerializationUtil.toJson(payload))
        httpPost.setEntity(entity)
        val response = httpClient.execute(httpPost)
        val responseStr = EntityUtils.toString(response.getEntity)
        val rowKey = UUID.randomUUID.toString
        auditHbaseTable.hbaseRecordWriter(rowKey, responseStr)
      }
      catch{
        case unknown => logger.info("Exception:" + unknown)
      }
     })
    })

Это приводит к повторным отправкам, скорее всего, из-за неудачной повторной попытки задачи.Я отслеживаю отправленные полезные данные в таблице hbase для диагностики.Здесь, в таблице hbase аудита, я вижу, что для некоторых полезных нагрузок делается несколько записей для одной полезной нагрузки.Существуют повторные вставки для уже обработанной полезной нагрузки, которая совпадает с метками времени повторных задач.

Мне нужны предложения (с кодами и ссылками, если возможно), чтобы гарантировать, что искра запускается только один раз для каждой строки / полезной нагрузки.

Один из превентивных способов, который я мог придумать, - это создать UUID при запуске и продолжать проверять в HBASE, если он был отправлен ранее, но он сталкивается с проблемами производительности.

val generateUUID=udf(()=> UUID.randomUUID().toString)
  val aDF2=aDF.withColumn("rowKey",generateUUID())
  aDF2.foreachPartition(rows => {
      ...
    rows.foreach( row => {
      try {
        if (auditHbaseTable.hbaseReader(rowKey).isEmpty){
            ...
        }
      }
        ...
    })
  })

Итак, кто-то может мне помочьс лучшим способом сделать это?Является ли 'foreachPartiton' правильным способом для этого?

Я также смотрю на фильтры Hbase Bloom, чтобы ускорить вышеописанные реализации, но я не знаю, как их использовать.Любые предложения для этого также приветствуются.

...