Невозможно сохранить использованные смещения Kafka в таблице HBase при фиксации через Spark D Streams - PullRequest
2 голосов
/ 18 января 2020

Я пытаюсь сохранить смещение Kafka Consumer в таблице HBase с флагом успеха после его обработки в бизнес-логике c. Весь этот процесс является частью Spark DStream, и я использую приведенный ниже фрагмент кода для этого:

    val hbaseTable = "table"

    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "server",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "topic",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean))

    val topic = Array("topicName")    

    val ssc = new StreamingContext(sc, Seconds(40))
    val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String] (topic, kafkaParams))

        stream.foreachRDD((rdd, batchTime) => {
              val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              offsetRanges.foreach(offset => println(offset.topic, offset.partition, 
              offset.fromOffset, offset.untilOffset))

              rdd.map(value => (value.value())).saveAsTextFile("path")
              println("Saved Data into file")

              var commits:OffsetCommitCallback = null
              rdd.foreachPartition(message => {
                  val hbaseConf = HBaseConfiguration.create()
                  val conn = ConnectionFactory.createConnection(hbaseConf)
                  val table = conn.getTable(TableName.valueOf(hbaseTable))
                  commits = new OffsetCommitCallback(){
                      def onComplete(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
                      exception: Exception) {
                          message.foreach(value => {
                          val key= value.key()
                          val offset = value.offset()
                          println(s"offset is: $offset")
                          val partitionId = TaskContext.get.partitionId()       
                          println(s"partitionID is: $partitionId")
                          val rowKey = key
                          val put = new Put(rowKey.getBytes)
                          if (exception != null) {
                             println("Got Error Message:" + exception.getMessage)
                             put.addColumn("o".getBytes, "flag".toString.getBytes(),"Error".toString.getBytes())
                         put.addColumn("o".getBytes,"error_message".toString.getBytes(),exception.getMessage.toString.getBytes())
                             println("Got Error Message:" + exception.getMessage)
                             table.put(put)
                           } else {
                              put.addColumn("o".getBytes, "flag".toString.getBytes(),"Success".toString.getBytes())
                              table.put(put)
                              println(offsets.values())
                            }
                        }
                     )
            println("Inserted into HBase")
             }
        }
        table.close()
        conn.close()
        }
        )
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commits)
      }

     )
    ssc.start()

Этот код выполняется успешно. Тем не менее, он не сохраняет данные в HBase и не создает журналы на уровне исполнителя (которые я печатаю, повторяя каждый раздел RDD). Не уверен, что именно мне здесь не хватает. Любая помощь будет принята с благодарностью.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...