Нам нужно было реализовать объединение по темам Kafka с учетом поздних данных или «не в соединении», что означает, что данные, которые поступают поздно в потоке или не присоединяются, не будут отброшены / потеряны, но будут помечены как тайм-аут,
результат объединения генерируется для вывода темы Кафки (с таймаутом, если он произошел).
(спарк 2.1.1 в автономном развертывании, Кафка 10)
Кафка в темах: X, Y, ... результат из тем будет выглядеть так:
{
"keyJoinFiled": 123456,
"xTopicData": {},
"yTopicData": {},
"isTimeOutFlag": true
}
Я нашел три решения, написанные здесь, 1 и 2 из официальной документации по потоковой передаче, но они не имеют отношения к нам (данные, не вошедшие в Dtsream, приходят с опозданием в рабочее время, пропущены / потеряны), но я написал их для сравнения .
Из того, что мы увидели, примеров присоединения к Kafka с помощью операции с состоянием не так уж и много, добавьте здесь код для обзора:
1) В соответствии с документацией о потоке искр,
https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html:
val stream1: DStream[String, String] =
val stream2: DStream[String, String] =
val joinedStream = stream1.join(stream2)
Это объединит данные из обоих пакетов потоков, но данные, поступившие «в рабочее время» поздно / не в соединении, будут отброшены / потеряны.
2) Соединение окон:
val leftWindowDF = kafkaStreamLeft.window(Minutes(input_parameter_time))
val rightWindowDF = kafkaStreamRight.window(Minutes(input_parameter_time))
leftWindowDF.join(rightWindowDF).foreachRDD...
2.1) В нашем случае нам нужно использовать окно Tumbling для
интервал между потоками искрового потока.
2.2) Необходимо сохранить много данных в памяти / диске, например, 30-60 мин.
окно
2.3) И снова данные поступают поздно / не в окно / не в соединение
упал / потерял.
* Начиная с версии 2.3.1 структурированного потокового потока для объединения потоков
поддерживается, но мы сталкиваемся с ошибкой не очистки состояния HDFS
магазин, в результате, работа каждые несколько часов падала на ООМ,
решено в 2.4
https://issues.apache.org/jira/browse/SPARK-23682
(использование хранилища состояний Rocksdb или CustomStateStoreProvider HDFS).
3) Использование mapful-режима работы с состоянием для присоединения к темам Kafka Dstreams
с переворачивающимся окном и 30-минутным таймаутом для поздних данных,
все данные, созданные для вывода тем, содержат присоединенные сообщения от всех
темы, если произошло соединение, или часть данных темы, если нет
соединение произошло через 30 минут (помечено флагом is_time_out)
3.1) Создание 1..n Dstream по теме, конвертировать в значение Key / Unioned
записи с объединением подаются в виде ключа и падающего окна.
создание всеобъемлющей схемы.
3.2) Объединение всех потоков
3.3) Запустить на объединении потока mapWithState с функцией - фактически выполнит
Тайм-аут присоединения / отметки.
Отличный пример объединения с сохранением состояния из блоков данных (spark 2.2.0):
https://www.youtube.com/watch?time_continue=1858&v=JAb4FIheP28
Добавление примера кода, который выполняется / тестируется.
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"session.timeout.ms" -> "30000"
)
//Kafka xTopic DStream
val kafkaStreamLeft = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](leftTopic.split(",").toSet, kafkaParams)
).map(record => {
val msg:xTopic = gson.fromJson(record.value(),classOf[xTopic])
Unioned(Some(msg),None,if (msg.sessionId!= null) msg.sessionId.toString else "")
}).window(Minutes(leftWindow),Minutes(leftWindow))
//Kafka yTopic DStream
val kafkaStreamRight = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](rightTopic.split(",").toSet, kafkaParams)
).map(record => {
val msg:yTopic = gson.fromJson(record.value(),classOf[yTopic])
Unioned(None,Some(msg),if (msg.sessionId!= null) msg.sessionId.toString else "")
}).window(Minutes(rightWindow),Minutes(rightWindow))
//convert stream to key, value pair and filter empty session id.
val unionStream = kafkaStreamLeft.union(kafkaStreamRight).map(record =>(record.sessionId,record))
.filter(record => !record._1.toString.isEmpty)
val stateSpec = StateSpec.function(stateUpdateF).timeout(Minutes(timeout.toInt))
unionStream.mapWithState(stateSpec).foreachRDD(rdd => {
try{
if(!rdd.isEmpty()) rdd.foreachPartition(partition =>{
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//send to kafka result JSON.
partition.foreach(record => {
if(record!=null && !"".equals(record) && !"()".equals(record.toString) && !"None".equals(record.toString) ){
producer.send(new ProducerRecord[String, String](outTopic, null, gson.toJson(record)))
}
})
producer.close()
})
}catch {
case e: Exception => {
logger.error(s""""error join topics :${leftTopic} ${rightTopic} to out topic ${outTopic}""")
logger.info(e.printStackTrace())
}
}})
//mapWithState function that will be called on each key occurrence with new items in newItemValues and state items if exits.
def stateUpdateF = (keySessionId:String,newItemValues:Option[Unioned],state:State[Unioned])=> {
val currentState = state.getOption().getOrElse(Unioned(None,None,keySessionId))
val newVal:Unioned = newItemValues match {
case Some(newItemValue) => {
if (newItemValue.yTopic.isDefined)
Unioned(if(newItemValue.xTopic.isDefined) newItemValue.xTopic else currentState.xTopic,newItemValue.yTopic,keySessionId)
else if (newItemValue.xTopic.isDefined)
Unioned(newItemValue.xTopic, if(currentState.yTopic.isDefined)currentState.yTopic else newItemValue.yTopic,keySessionId)
else newItemValue
}
case _ => currentState //if None = timeout => currentState
}
val processTs = LocalDateTime.now()
val processDate = dtf.format(processTs)
if(newVal.xTopic.isDefined && newVal.yTopic.isDefined){//if we have a join remove from state
state.remove()
JoinState(newVal.sessionId,newVal.xTopic,newVal.yTopic,false,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
}else if(state.isTimingOut()){//time out do no try to remove state manually ,it's removed automatically.
JoinState(newVal.sessionId, newVal.xTopic, newVal.yTopic,true,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
}else{
state.update(newVal)
}
}
//case class for kafka topics data.(x,y topics ) join will be on session id filed.
case class xTopic(sessionId:String,param1:String,param2:String,sessionCreationDate:String)
case class yTopic(sessionId:Long,clientTimestamp:String)
//catch all schema : object that contains both kafka input fileds topics and key valiue for join.
case class Unioned(xTopic:Option[xTopic],yTopic:Option[yTopic],sessionId:String)
//class for output result of join stateful function.
case class JoinState(sessionId:String, xTopic:Option[xTopic],yTopic:Option[yTopic],isTimeOut:Boolean,processTs:Long,processDate:String)
Буду рад некоторому обзору.
Простите за длинный пост.