Я хочу показать вам простой пример использования библиотеки com.typesafe.config
Это mi application.properties в каталоге ресурсов.
## Structured Streaming device
device.zookeeper = quickstart.cloudera:2181
device.bootstrap.server = quickstart.cloudera:9092
device.topic = device
device.execution.mode = local
device.data.host = quickstart.cloudera
device.data.port = 44444
## HBase
device.zookeeper.quorum = quickstart.cloudera
device.zookeeper.port = 2181
device.window = 1
и это код для получения свойств, args (0) == device
def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load // get Confs
val envProps: Config = conf.getConfig(args(0)) // args(0) == device
val sparkConf = new SparkConf().setMaster(envProps.getString("execution.mode")).setAppName("Device Signal") // get execution.mode conf
val streamingContext = new StreamingContext(sparkConf, Seconds(envProps.getInt("window"))) // get window conf
streamingContext.sparkContext.setLogLevel("ERROR")
val broadcastConfig = streamingContext.sparkContext.broadcast(envProps)
val topicsSet = Set(envProps.getString("topic")) // get topic conf
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> envProps.getString("bootstrap.server"), // get bootstrap.server conf
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val logData: DStream[String] = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
).map(record =>{
record.value
})
Надеюсь, это может быть печально.
Привет.