Я пытаюсь сохранить смещение Kafka Consumer в таблице HBase с флагом успеха после его обработки в бизнес-логике c. Весь этот процесс является частью Spark DStream, и я использую приведенный ниже фрагмент кода для этого:
val hbaseTable = "table"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "server",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "topic",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topic = Array("topicName")
val ssc = new StreamingContext(sc, Seconds(40))
val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String] (topic, kafkaParams))
stream.foreachRDD((rdd, batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic, offset.partition,
offset.fromOffset, offset.untilOffset))
rdd.map(value => (value.value())).saveAsTextFile("path")
println("Saved Data into file")
var commits:OffsetCommitCallback = null
rdd.foreachPartition(message => {
val hbaseConf = HBaseConfiguration.create()
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTable))
commits = new OffsetCommitCallback(){
def onComplete(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception) {
message.foreach(value => {
val key= value.key()
val offset = value.offset()
println(s"offset is: $offset")
val partitionId = TaskContext.get.partitionId()
println(s"partitionID is: $partitionId")
val rowKey = key
val put = new Put(rowKey.getBytes)
if (exception != null) {
println("Got Error Message:" + exception.getMessage)
put.addColumn("o".getBytes, "flag".toString.getBytes(),"Error".toString.getBytes())
put.addColumn("o".getBytes,"error_message".toString.getBytes(),exception.getMessage.toString.getBytes())
println("Got Error Message:" + exception.getMessage)
table.put(put)
} else {
put.addColumn("o".getBytes, "flag".toString.getBytes(),"Success".toString.getBytes())
table.put(put)
println(offsets.values())
}
}
)
println("Inserted into HBase")
}
}
table.close()
conn.close()
}
)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, commits)
}
)
ssc.start()
Этот код выполняется успешно. Тем не менее, он не сохраняет данные в HBase и не создает журналы на уровне исполнителя (которые я печатаю, повторяя каждый раздел RDD). Не уверен, что именно мне здесь не хватает. Любая помощь будет принята с благодарностью.