Spark Streaming присоединиться Kafka темы сравнения - PullRequest
0 голосов
/ 07 января 2019

Нам нужно было реализовать объединение по темам Kafka с учетом поздних данных или «не в соединении», что означает, что данные, которые поступают поздно в потоке или не присоединяются, не будут отброшены / потеряны, но будут помечены как тайм-аут,

результат объединения генерируется для вывода темы Кафки (с таймаутом, если он произошел).

(спарк 2.1.1 в автономном развертывании, Кафка 10)

Кафка в темах: X, Y, ... результат из тем будет выглядеть так:

{
    "keyJoinFiled": 123456,
    "xTopicData": {},
    "yTopicData": {},
    "isTimeOutFlag": true
}

Я нашел три решения, написанные здесь, 1 и 2 из официальной документации по потоковой передаче, но они не имеют отношения к нам (данные, не вошедшие в Dtsream, приходят с опозданием в рабочее время, пропущены / потеряны), но я написал их для сравнения .

Из того, что мы увидели, примеров присоединения к Kafka с помощью операции с состоянием не так уж и много, добавьте здесь код для обзора:

1) В соответствии с документацией о потоке искр,

https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html:   
 val stream1: DStream[String, String] = 
 val stream2: DStream[String, String] = 
 val joinedStream = stream1.join(stream2)

Это объединит данные из обоих пакетов потоков, но данные, поступившие «в рабочее время» поздно / не в соединении, будут отброшены / потеряны.

2) Соединение окон:

val leftWindowDF = kafkaStreamLeft.window(Minutes(input_parameter_time))
val rightWindowDF = kafkaStreamRight.window(Minutes(input_parameter_time))
leftWindowDF.join(rightWindowDF).foreachRDD...

2.1) В нашем случае нам нужно использовать окно Tumbling для интервал между потоками искрового потока. 2.2) Необходимо сохранить много данных в памяти / диске, например, 30-60 мин. окно 2.3) И снова данные поступают поздно / не в окно / не в соединение упал / потерял. * Начиная с версии 2.3.1 структурированного потокового потока для объединения потоков поддерживается, но мы сталкиваемся с ошибкой не очистки состояния HDFS магазин, в результате, работа каждые несколько часов падала на ООМ, решено в 2.4 https://issues.apache.org/jira/browse/SPARK-23682 (использование хранилища состояний Rocksdb или CustomStateStoreProvider HDFS).

3) Использование mapful-режима работы с состоянием для присоединения к темам Kafka Dstreams с переворачивающимся окном и 30-минутным таймаутом для поздних данных, все данные, созданные для вывода тем, содержат присоединенные сообщения от всех темы, если произошло соединение, или часть данных темы, если нет соединение произошло через 30 минут (помечено флагом is_time_out)

3.1) Создание 1..n Dstream по теме, конвертировать в значение Key / Unioned записи с объединением подаются в виде ключа и падающего окна. создание всеобъемлющей схемы. 3.2) Объединение всех потоков 3.3) Запустить на объединении потока mapWithState с функцией - фактически выполнит Тайм-аут присоединения / отметки.

Отличный пример объединения с сохранением состояния из блоков данных (spark 2.2.0): https://www.youtube.com/watch?time_continue=1858&v=JAb4FIheP28

Добавление примера кода, который выполняется / тестируется.

 val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "session.timeout.ms" -> "30000"
  )

  //Kafka xTopic DStream
  val kafkaStreamLeft = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](leftTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:xTopic = gson.fromJson(record.value(),classOf[xTopic])
    Unioned(Some(msg),None,if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(leftWindow),Minutes(leftWindow))

  //Kafka yTopic DStream
  val kafkaStreamRight = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](rightTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:yTopic = gson.fromJson(record.value(),classOf[yTopic])
    Unioned(None,Some(msg),if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(rightWindow),Minutes(rightWindow))

  //convert stream to key, value pair and filter empty session id.
  val unionStream = kafkaStreamLeft.union(kafkaStreamRight).map(record =>(record.sessionId,record))
    .filter(record => !record._1.toString.isEmpty)
  val stateSpec = StateSpec.function(stateUpdateF).timeout(Minutes(timeout.toInt))

  unionStream.mapWithState(stateSpec).foreachRDD(rdd => {
    try{
      if(!rdd.isEmpty()) rdd.foreachPartition(partition =>{
        val props = new util.HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        //send to kafka result JSON.
        partition.foreach(record => {
          if(record!=null && !"".equals(record) && !"()".equals(record.toString) && !"None".equals(record.toString) ){
            producer.send(new ProducerRecord[String, String](outTopic, null, gson.toJson(record)))
          }
        })
        producer.close()
      })
    }catch {
      case e: Exception  => {
        logger.error(s""""error join topics :${leftTopic} ${rightTopic} to out topic ${outTopic}""")
        logger.info(e.printStackTrace())
      }
    }})

//mapWithState function that will be called on each key occurrence with new items in newItemValues and state items if exits.

def stateUpdateF = (keySessionId:String,newItemValues:Option[Unioned],state:State[Unioned])=> {
    val currentState = state.getOption().getOrElse(Unioned(None,None,keySessionId))

    val newVal:Unioned = newItemValues match {
      case Some(newItemValue) => {
        if (newItemValue.yTopic.isDefined)
          Unioned(if(newItemValue.xTopic.isDefined) newItemValue.xTopic else currentState.xTopic,newItemValue.yTopic,keySessionId)
        else if (newItemValue.xTopic.isDefined)
          Unioned(newItemValue.xTopic, if(currentState.yTopic.isDefined)currentState.yTopic else newItemValue.yTopic,keySessionId)
        else newItemValue
      }
      case _ => currentState //if None = timeout => currentState
    }

    val processTs = LocalDateTime.now()
    val processDate = dtf.format(processTs)
    if(newVal.xTopic.isDefined && newVal.yTopic.isDefined){//if we have a join remove from state
      state.remove()
      JoinState(newVal.sessionId,newVal.xTopic,newVal.yTopic,false,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else if(state.isTimingOut()){//time out do no try to remove state manually ,it's removed automatically.
        JoinState(newVal.sessionId, newVal.xTopic, newVal.yTopic,true,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else{
      state.update(newVal)
    }
  }

  //case class for kafka topics data.(x,y topics ) join will be on session id filed.
  case class xTopic(sessionId:String,param1:String,param2:String,sessionCreationDate:String)
  case class yTopic(sessionId:Long,clientTimestamp:String)
  //catch all schema : object that contains both kafka input fileds topics and key valiue for join.
  case class Unioned(xTopic:Option[xTopic],yTopic:Option[yTopic],sessionId:String)
  //class for  output result of join stateful function.
  case class JoinState(sessionId:String, xTopic:Option[xTopic],yTopic:Option[yTopic],isTimeOut:Boolean,processTs:Long,processDate:String)

Буду рад некоторому обзору. Простите за длинный пост.

1 Ответ

0 голосов
/ 09 января 2019

У меня сложилось впечатление, что этот вариант использования был решен с помощью API Sessionization ?:

StructuredSessionization.scala

И Операции с сохранением состояния в структурированной потоковой передаче

Или я что-то упустил?

...