Приложение Читает из темы кафки.Каждое сообщение должно быть уникальным (дубликаты игнорируются) содержит данные за 'N' секунд и записывает в разные темы кафки как отдельные сообщения
Есть ли способ удержать сообщение в течение 'N секунд' и записать в кафку Каждыйсообщение должно быть записано в ту же тему через N секунд после того, как оно пришло.
В настоящее время я храню данные в структуре json в памяти и каждый раз, когда приходит сообщение, я перебираювсе сообщения, которые у меня есть, и время сравнения.
Естественно, это не способ сделать это
val some_consumer= new FlinkKafkaConsumer09(data_topic
, new JSONKeyValueDeserializationSchema(false), properties)
some_consumer.setStartFromLatest()
val in_stream = env.addSource(some_consumer)
.filter(!_.isNull)
.map(x => processMessage(x))
def process(x: ObjectNode){
// store message in json if not existing
// loop through entire set and compare times
// if after 'N' seconds
// write to kafka
kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))
}