Сохранить текущее смещение Кафки.Я могу сохранить только первое или последнее смещение - PullRequest
0 голосов
/ 15 ноября 2018

Сохраняю смещение темы Кафка в таблице на Hbase.Я использую эту тему для запланированного задания Spark и для сохранения смещения использую этот метод:

def saveOffset(offsetRanges: Array[OffsetRange], tableOfOffset: org.apache.hadoop.hbase.client.Table): Unit = {
if (offsetRanges == null || tableOfOffset == null)
  throw new IllegalArgumentException("CAUSE: offsetRanges, tableOfOffset")
else {
  offsetRanges.foreach(row => {
    val put = new Put(row.topic.getBytes())
    put.addColumn(Main, "PARTITION".getBytes(), Bytes.toBytes(row.partition))
    put.addColumn(Main, "STARTOFFSET".getBytes(), Bytes.toBytes(row.untilOffset))
    tableOfOffset.put(put)
  })
}}

Каждый раз, когда я запускаю задание, я читаю сообщения в теме, запускаю foreachRDD и сохраняю смещениепосле правильной записи записи в Hbase.
Этот метод сохраняет последнее смещение из OffsetRange даже после первой итерации.Если я изменяю row.untilOffset в row.fromOffset, я сохраняю каждый раз первое смещение в диапазоне (0).Мне нужно сохранить текущее смещение, потому что, если моя работа падает с диапазона 2, мне нужно начинать с диапазона 2 при следующем запуске задания.

Как сохранить смещение текущего сообщения от Кафки?Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...