Как сократить время, необходимое для экономии времени от искры до базы на SHC? - PullRequest
0 голосов
/ 22 февраля 2019

Я использую Kafka для передачи данных, потоковую передачу Spark для получения данных и SHC (Spark-Hbase Connector / Hortonworks) для сохранения данных в HBase.

Но у меня проблема с сохранением данных.Сохранение данных занимает много времени.Таким образом, проблема заключается в буферизации.

вот мой код.

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import spark.sqlContext.implicits._

import java.time.{Instant, ZoneId, ZonedDateTime}
import java.time.format.{ DateTimeFormatter, DateTimeParseException }

val ssc = new StreamingContext(sc, Seconds(1))

val topics = "sensor_rpi"

val topicsSet = topics.split(",").toSet
val brokers = "sdc4:9092,sdc5:9092,sdc6:9092,sdc7:9092"

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
      ssc, kafkaParams, topicsSet).map(_._2)

val words = messages.flatMap(_.split("[=]+"))

def catalog = s"""{
     "table":{"namespace":"default", "name":"sensor_rpi"},
     "rowkey":"key",
     "columns":{
     "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
     "X":{"cf":"DATA", "col":"X", "type":"int"},
     "Y":{"cf":"DATA", "col":"Y", "type":"int"},
     "Z":{"cf":"DATA", "col":"Z", "type":"int"}
     }
}""".stripMargin

case class Sensor(
rowkey:String,
X:Int,
Y:Int,
Z:Int
) extends Serializable


var sensor=words.map{line=>
val str = line.split('|')
val dateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")
Sensor(str(3)+"."+str(4),str(0).toInt,str(1).toInt,str(2).toInt)
}
sensor.foreachRDD{rdd=>
sc.parallelize(rdd.collect())
 .toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> 
 catalog,HBaseTableCatalog.newTable- 
 >"5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
}
ssc.start()

, когда я удаляю функцию foreachRDD, тогда нет никакой задержки между вводом (kafka) и выводом (Hbase).Я не знаю, что мне делать ...

пожалуйста, дайте мне подсказку ...

...