Чтение данных, удержание данных в течение N секунд, запись данных (Кафка, Flink) - PullRequest
0 голосов
/ 05 апреля 2019

Приложение Читает из темы кафки.Каждое сообщение должно быть уникальным (дубликаты игнорируются) содержит данные за 'N' секунд и записывает в разные темы кафки как отдельные сообщения

Есть ли способ удержать сообщение в течение 'N секунд' и записать в кафку Каждыйсообщение должно быть записано в ту же тему через N секунд после того, как оно пришло.

В настоящее время я храню данные в структуре json в памяти и каждый раз, когда приходит сообщение, я перебираювсе сообщения, которые у меня есть, и время сравнения.

Естественно, это не способ сделать это

val some_consumer= new FlinkKafkaConsumer09(data_topic
      , new JSONKeyValueDeserializationSchema(false), properties)
    some_consumer.setStartFromLatest()
    val in_stream = env.addSource(some_consumer)
      .filter(!_.isNull)
      .map(x => processMessage(x))
def process(x: ObjectNode){
 // store message in json if not existing
 // loop through entire set and compare times
 // if after 'N' seconds
   // write to kafka
    kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))


}

Ответы [ 2 ]

3 голосов
/ 05 апреля 2019

Вы должны держать сообщения в состоянии Flink, чтобы они были отмечены контрольными точками и были восстановлены в случае сбоев.

Чтобы дедуплицировать поток, вы можете назначить поток любым атрибутом, делающим событие уникальным, т. Е. keyBy(x -> x.uniqueId). Тогда я бы использовал KeyedProcessFunction и буферизовал первое событие для каждого ключа в ValueState<Event>. Вы можете использовать EventTimeTimer или ProcessingTimeTimer для запуска отправки события (в зависимости от того, что подходит). Если область дедупликации составляет N секунд, вы можете очистить состояние одновременно с отправкой события.

0 голосов
/ 05 апреля 2019

Вы можете использовать Tumbling Windows https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#tumbling-windows

    .window(TumblingEventTimeWindows.of(Time.seconds(5)))

Приведенный выше пример означает, что данные выходят каждые 5 секунд, и вы можете ясно видеть их при печати на консоль

в вашем случае вам не нужно EventTime и вы можете использовать ProcessingTime.Также вам не нужен keyBy (), просто используйте AllWindow, хотя неплохо использовать keyBy (), поэтому вы получаете параллелизм

после window(), вы можете вызвать FlinkKafkaSink.Поскольку это окно будет периодически генерировать события каждые X минут / секунд по вашему желанию

Вы можете быть осторожны с ограничением памяти, потому что данные, которые хранятся в окне, хранятся в памяти

...