Как создать соединения на hbase с помощью Spark Streaming - PullRequest
0 голосов
/ 08 мая 2019

Какой лучший способ открыть и закрыть соединение на hbase с искровой потоковой передачей?

Я сделал следующий код, но я не знаю, является ли правильным:

object hbaseConnection extends Serializable {
    lazy val connection = {
      val config = HBaseConfiguration.create();
      val connection = ConnectionFactory.createConnection(config);
      val table = connection.getTable(TableName.valueOf("smartcommerce:teste"))
      table
    }
}

Я использую Lazyсоединение в foreachpartition, но я не знаю, где закрыто соединение.

object StreamApp extends Serializable {

  def main(args: Array[String]): Unit = {

    val c = new Converter();

    val ss = SparkSession.builder()
      .config("spark.dynamicAllocation.enabled", true)
      .config("spark.shuffle.service.enabled", true).config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate();

    val sc = ss.sparkContext
    val sqlContext = new SQLContext(sc)

    val conf = new SparkConf().setAppName("Simple Streaming Application")

    val ssc = new StreamingContext(sc, Seconds(10))

    val kafkaParams = Map[String, Object](
      //      "bootstrap.servers" -> "hnodevp024.prd.bigdata.dc.nova:6667,hnodevp025.prd.bigdata.dc.nova:6667,hnodevp026.prd.bigdata.dc.nova:6667,hnodevp027.prd.bigdata.dc.nova:6667,hnodevp028.prd.bigdata.dc.nova:6667,hnodevp029.prd.bigdata.dc.nova:6667",
      "bootstrap.servers" -> "hnodevh002.hlg.bigdata.dc.nova:6667,hnodevh005.hlg.bigdata.dc.nova:6667",
      "group.id" -> "pedidos_stream",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "security.protocol" -> "PLAINTEXTSASL",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    val topics = Array("TESTE_STREAMING")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val t = stream.map(m => c.jsonConverter(m.value()))

    t.foreachRDD { rdd =>
      import sqlContext.implicits._
      val df = rdd.toDF()

      rdd.foreachPartition { fpartition =>
        val table = connection

        fpartition.foreach { f =>
          val put = new Put(Bytes.toBytes(f.rowkey.toString()))

          put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("idCompra"), Bytes.toBytes(f.IdCompra.toString()))
          put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("IdCompraEntrega"), Bytes.toBytes(f.IdCompraEntrega.toString()))
          put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("IdFreteEntregaTipo"), Bytes.toBytes(f.IdFreteEntregaTipo.toString()))

         table.put(put)
        }   
      }
    }

    ssc.start()
    ssc.awaitTermination()

  }
}

В spark docs я не очень хорошо понял!Пожалуйста, помогите мне!

Пока

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...