это рабочий пример, основанный на записи в блоге Шаблоны интеграции Spark и Kafka .
package org.sense.spark.app
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.fusesource.mqtt.client.QoS
import org.sense.spark.util.{MqttSink, TaxiRideSource}
object TaxiRideCountCombineByKey {
val mqttTopic: String = "spark-mqtt-sink"
val qos: QoS = QoS.AT_LEAST_ONCE
def main(args: Array[String]): Unit = {
val outputMqtt: Boolean = if (args.length > 0 && args(0).equals("mqtt")) true else false
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 4 cores to prevent from a starvation scenario.
val sparkConf = new SparkConf()
.setAppName("TaxiRideCountCombineByKey")
.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val stream = ssc.receiverStream(new TaxiRideSource())
val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
val countStream = driverStream.combineByKey(
(v) => (v, 1), //createCombiner
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
new HashPartitioner(3)
)
if (outputMqtt) {
println("Use the command below to consume data:")
println("mosquitto_sub -h 127.0.0.1 -p 1883 -t " + mqttTopic)
val mqttSink = ssc.sparkContext.broadcast(MqttSink())
countStream.foreachRDD { rdd =>
rdd.foreach { message =>
mqttSink.value.send(mqttTopic, message.toString()) // "send" method does not exist
}
}
} else {
countStream.print()
}
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
package org.sense.spark.util
import org.fusesource.mqtt.client.{FutureConnection, MQTT, QoS}
class MqttSink(createProducer: () => FutureConnection) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, message: String): Unit = {
producer.publish(topic, message.toString().getBytes, QoS.AT_LEAST_ONCE, false)
}
}
object MqttSink {
def apply(): MqttSink = {
val f = () => {
val mqtt = new MQTT()
mqtt.setHost("localhost", 1883)
val producer = mqtt.futureConnection()
producer.connect().await()
sys.addShutdownHook {
producer.disconnect().await()
}
producer
}
new MqttSink(f)
}
}
package org.sense.spark.util
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.zip.GZIPInputStream
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
case class TaxiRide(rideId: Long, isStart: Boolean, startTime: DateTime, endTime: DateTime,
startLon: Float, startLat: Float, endLon: Float, endLat: Float,
passengerCnt: Short, taxiId: Long, driverId: Long)
object TimeFormatter {
val timeFormatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC()
}
class TaxiRideSource extends Receiver[TaxiRide](StorageLevel.MEMORY_AND_DISK_2) {
val dataFilePath = "/home/flink/nycTaxiRides.gz";
var dataRateListener: DataRateListener = _
/**
* Start the thread that receives data over a connection
*/
def onStart() {
dataRateListener = new DataRateListener()
dataRateListener.start()
new Thread("TaxiRide Source") {
override def run() {
receive()
}
}.start()
}
def onStop() {}
/**
* Periodically generate a TaxiRide event and regulate the emission frequency
*/
private def receive() {
while (!isStopped()) {
val gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath))
val reader: BufferedReader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8))
try {
var line: String = null
do {
// start time before reading the line
val startTime = System.nanoTime
// read the line on the file and yield the object
line = reader.readLine
if (line != null) {
val taxiRide: TaxiRide = getTaxiRideFromString(line)
store(taxiRide)
}
// regulate frequency of the source
dataRateListener.busySleep(startTime)
} while (line != null)
} finally {
reader.close
}
}
}
def getTaxiRideFromString(line: String): TaxiRide = {
// println(line)
val tokens: Array[String] = line.split(",")
if (tokens.length != 11) {
throw new RuntimeException("Invalid record: " + line)
}
val rideId: Long = tokens(0).toLong
val (isStart, startTime, endTime) = tokens(1) match {
case "START" => (true, DateTime.parse(tokens(2), TimeFormatter.timeFormatter), DateTime.parse(tokens(3), TimeFormatter.timeFormatter))
case "END" => (false, DateTime.parse(tokens(2), TimeFormatter.timeFormatter), DateTime.parse(tokens(3), TimeFormatter.timeFormatter))
case _ => throw new RuntimeException("Invalid record: " + line)
}
val startLon: Float = if (tokens(4).length > 0) tokens(4).toFloat else 0.0f
val startLat: Float = if (tokens(5).length > 0) tokens(5).toFloat else 0.0f
val endLon: Float = if (tokens(6).length > 0) tokens(6).toFloat else 0.0f
val endLat: Float = if (tokens(7).length > 0) tokens(7).toFloat else 0.0f
val passengerCnt: Short = tokens(8).toShort
val taxiId: Long = tokens(9).toLong
val driverId: Long = tokens(10).toLong
TaxiRide(rideId, isStart, startTime, endTime, startLon, startLat, endLon, endLat, passengerCnt, taxiId, driverId)
}
}