Это решение, которое я реализовал
1: сохранить текущее время в переменной при запуске задания потоковой передачи
val cuttoffTime = System.currentTimeMillis ()
2: Создать DirectStream
val directKafkaStream= KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
3: применить критерии фильтра
В цикле foreach примените критерии фильтра, как показано ниже
directKafkaStream.foreachRDD {rdd =>
val FilterRdd = rdd.filter (_. timestamp ()