Вы можете использовать mapWithState
. Есть хорошее руководство по использованию Stateful Streaming .
В вашем случае вы могли бы:
1.Установите контрольную точку:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")
2. Определить функцию обновления:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
# you can update your state in any value you want
# it is just a marker that value not new
state.update(value.get)
Option((key, v))
case (_, _) if state.isTimingOut() => None
}
}
3.Создать состояние спецификации:
val stateSpec =
StateSpec
.function(update _)
# it is important to define how long
# you want to check duplication
# in this example check interval is 1 second.
.timeout(Seconds(1))
4.Используйте его:
ks
# make key->value pairs
.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
.mapWithState(stateSpec)
Если вы хотите взять последнее из значений, функция обновления может быть:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
state.update(value.get)
None
case (_, _) if state.isTimingOut() => Option((key, value.get))
}
}