Фильтрация частичных дубликатов с mapWithState Spark Streaming - PullRequest
0 голосов
/ 02 мая 2018

У нас есть DStream, например

val ssc = new StreamingContext(sc, Seconds(1))

val kS = KafkaUtils.createDirectStream[String, TMapRecord](
  ssc,
  PreferConsistent,
  Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
  mapPartitions(part => {
    part.map(_.value())
  }).
  mapPartitions(part1 => {
    part1.map(c => {
      TMsg(1,
        c.field1,
        c.field2, //And others
        c.startTimeSeconds
      )
    })
  })

Таким образом, каждый RDD имеет набор TMsg объектов с некоторыми (техническими) ключевыми полями, которые я могу использовать для дедупликации DStream. По сути, , если у нас есть два объекта TMsg В ОДНОЙ ИЛИ ДВУХ ДИСКРЕТИЗИРОВАННЫХ СДР с одинаковыми field1 и field2, и они отличаются менее чем на 1 секунду (мы смотрим на startTimeSeconds), это дублируют .

Я просмотрел mapWithState. Да, я могу создать K -> V DStream как

val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)

Так что я могу использовать функцию, но не понимаю, как я могу использовать ее для фильтрации дубликатов.

Оконная функция не может помочь, и я не могу использовать функцию (структурированный поток) .deduplicate, поскольку решение написано в DStreams.

Есть какие-нибудь решения? Спасибо

P.S. Версия Spark 2.2

1 Ответ

0 голосов
/ 03 мая 2018

Вы можете использовать mapWithState. Есть хорошее руководство по использованию Stateful Streaming . В вашем случае вы могли бы:

1.Установите контрольную точку:

val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")

2. Определить функцию обновления:

def update(key: (String, String),
           value: Option[Int],
           state: State[Int]): Option[((String, String), Int)] = {
  (value, state.getOption()) match {
    case (Some(_), Some(_)) => None
    case (Some(v), _) =>
      # you can update your state in any value you want
      # it is just a marker that value not new
      state.update(value.get)
      Option((key, v))
    case (_, _) if state.isTimingOut() => None
  }
}

3.Создать состояние спецификации:

val stateSpec =
  StateSpec
    .function(update _)
    # it is important to define how long 
    # you want to check duplication
    # in this example check interval is 1 second.
    .timeout(Seconds(1))

4.Используйте его:

ks
  # make key->value pairs
  .map(m => (m.field1, m.field2) -> m.startTimeSeconds)
  .mapWithState(stateSpec)

Если вы хотите взять последнее из значений, функция обновления может быть:

  def update(key: (String, String),
                       value: Option[Int],
                       state: State[Int]): Option[((String, String), Int)] = {
    (value, state.getOption()) match {
      case (Some(_), Some(_)) => None
      case (Some(v), _) =>
        state.update(value.get)
        None
      case (_, _) if state.isTimingOut() => Option((key, value.get))
    }
  }
...