Несоответствие и резкое поведение фильтра Spark, текущей временной метки и пользовательского приемника HBase в структурированной потоковой передаче Spark - PullRequest
0 голосов
/ 26 октября 2018

У меня есть таблица HBase, которая выглядит следующим образом в статическом Dataframe как HBaseStaticRecorddf

---------------------------------------------------------------
|rowkey|Name|Number|message|lastTS|
|-------------------------------------------------------------|
|266915488007398|somename|8759620897|Hi|1539931239            |
|266915488007399|somename|8759620898|Welcome|1540314926       |
|266915488007400|somename|8759620899|Hello|1540315092         |
|266915488007401|somename|8759620900|Namaskar|1537148280      |
 --------------------------------------------------------------

Теперь у меня есть источник файлового потока, из которого я получу потоковый ключ строки.Теперь эта отметка времени (lastTS) для потоковой передачи rowkey должна быть проверена независимо от того, являются ли они старше одного дня или нет.Для этого у меня есть следующий код, где joinDF - это потоковый DataFrame, который формируется путем объединения другого потокового DataFrame и статического фрейма данных HBase следующим образом.

val HBaseStreamDF = HBaseStaticRecorddf.join(anotherStreamDF,"rowkey")

val newdf = HBaseStreamDF.filter(HBaseStreamDF.col("lastTS").cast("Long") < ((System.currentTimeMillis - 86400*1000)/1000))//records older than one day are eligible to get updated

После того, как фильтр закончен, я хочу сохранить эту записьна HBase, как показано ниже.

   newDF.writeStream
  .foreach(new ForeachWriter[Row] {
    println("inside foreach")

    val tableName: String = "dummy"
    val hbaseConfResources: Seq[String] = Seq("hbase-site.xml")
    private var hTable: Table = _
    private var connection: Connection = _


    override def open(partitionId: Long, version: Long): Boolean = {
      connection = createConnection()
      hTable = getHTable(connection)
      true
    }

    def createConnection(): Connection = {
      val hbaseConfig = HBaseConfiguration.create()
      hbaseConfResources.foreach(hbaseConfig.addResource)
      ConnectionFactory.createConnection(hbaseConfig)

    }

    def getHTable(connection: Connection): Table = {
      connection.getTable(TableName.valueOf(tableName))
    }

    override def process(record: Row): Unit = {
      var put = saveToHBase(record)
      hTable.put(put)
    }

    override def close(errorOrNull: Throwable): Unit = {
      hTable.close()
      connection.close()
    }

    def saveToHBase(record: Row): Put = {
    val p = new Put(Bytes.toBytes(record.getString(0)))
    println("Now updating HBase for " + record.getString(0))


      p.add(Bytes.toBytes("messageInfo"),
        Bytes.toBytes("ts"),
        Bytes.toBytes((System.currentTimeMillis/1000).toString)) //saving as second

      p
    }

  }
  ).outputMode(OutputMode.Update())
  .start().awaitTermination()

Теперь, когда поступает любая запись, HBase обновляется только в первый раз.Если та же самая запись появляется позже, она просто игнорируется и не работает.Однако, если появляется какая-то уникальная запись, которая не была обработана приложением Spark, она работает.Так что любая дублированная запись не обрабатывается во второй раз.

Теперь вот кое-что интересное.

  1. Если я удаляю вычитание 86400 сек из (System.currentTimeMillis - 86400 * 1000) / 1000), то все обрабатывается, даже если среди входящих записей есть избыточность.Но это не предназначено и полезно, поскольку не фильтрует записи на 1 день старше.

  2. Если я выполняю сравнение в условиях фильтра в миллисекундах без деления на 1000 (для этого также требуются данные HBaseв миллисекунду) и сохраните запись как секунду в положенном объекте, после чего снова все будет обработано.Но если я изменю формат на секунды в объекте put, он не будет работать.

Я попытался по отдельности протестировать фильтр и HBase, и они оба работают нормально.Но вместе они портятся, если System.currentTimeMillis в фильтре имеет некоторые арифметические операции, такие как / 1000 или -864000.Если я удаляю часть приемника HBase и использую

newDF.writeStream.format("console").start().awaitTermination()

, тогда снова работает логика фильтра.И если я уберу фильтр, то мойка HBase будет работать нормально.Но вместе пользовательский приемник для HBase впервые работает только для уникальных записей.Я попробовал несколько других логик фильтра, как показано ниже, но проблема остается той же.

val newDF = newDF1.filter(col("lastTS").lt(LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30"))))

или

val newDF = newDF1.filter(col("lastTS").cast("Long") < LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30")))

Как заставить фильтр работать и сохранять отфильтрованные записи в HBase с обновленнымотметка времени?Я взял ссылку на несколько других постов.Но результат тот же.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...