Сохраняю смещение темы Кафка в таблице на Hbase.Я использую эту тему для запланированного задания Spark и для сохранения смещения использую этот метод:
def saveOffset(offsetRanges: Array[OffsetRange], tableOfOffset: org.apache.hadoop.hbase.client.Table): Unit = {
if (offsetRanges == null || tableOfOffset == null)
throw new IllegalArgumentException("CAUSE: offsetRanges, tableOfOffset")
else {
offsetRanges.foreach(row => {
val put = new Put(row.topic.getBytes())
put.addColumn(Main, "PARTITION".getBytes(), Bytes.toBytes(row.partition))
put.addColumn(Main, "STARTOFFSET".getBytes(), Bytes.toBytes(row.untilOffset))
tableOfOffset.put(put)
})
}}
Каждый раз, когда я запускаю задание, я читаю сообщения в теме, запускаю foreachRDD и сохраняю смещениепосле правильной записи записи в Hbase.
Этот метод сохраняет последнее смещение из OffsetRange даже после первой итерации.Если я изменяю row.untilOffset
в row.fromOffset
, я сохраняю каждый раз первое смещение в диапазоне (0).Мне нужно сохранить текущее смещение, потому что, если моя работа падает с диапазона 2, мне нужно начинать с диапазона 2 при следующем запуске задания.
Как сохранить смещение текущего сообщения от Кафки?Спасибо