Искровой стрим от кафки, как указать время отключения для опрошенных событий - PullRequest
0 голосов
/ 11 января 2019

У меня есть приложение для потокового воспроизведения, которое запускается в конце дня и использует события kafka, отправленные вышестоящим приложением. В настоящее время вышестоящее приложение продолжает загружать новые данные в течение всего дня, и мой потребитель в конечном итоге потребляет их. Я хочу ограничить количество потребляемых событий на основе отсечки, скажем, 6 вечера в день. Есть ли способ указать отсечение, чтобы ограничить количество потребляемых событий на основе отсечки, скажем, отметки времени события kafka или чего-то еще. Ниже приведен код потребителя

  KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

Ответы [ 3 ]

0 голосов
/ 11 января 2019

Когда вы объявляете контекст потоковой передачи, мы можем упомянуть время отключения для создания dsstream и передать это значение параметру createDirectStream. Пожалуйста, найдите кодовый снимок. В приведенном ниже коде 5 секунд как время отключения. поэтому каждые 5 секунд будет создаваться DStream RDD.

sc = spark.sparkContext
ssc = StreamingContext(sc,5)
kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], {"metadata.broker.list": 'Server-name:port-number'},valueDecoder=serializer.decode_message)
0 голосов
/ 14 января 2019

Это решение, которое я реализовал

1: сохранить текущее время в переменной при запуске задания потоковой передачи

val cuttoffTime = System.currentTimeMillis ()

2: Создать DirectStream

val directKafkaStream=   KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))

3: применить критерии фильтра В цикле foreach примените критерии фильтра, как показано ниже

directKafkaStream.foreachRDD {rdd => val FilterRdd = rdd.filter (_. timestamp ()

0 голосов
/ 11 января 2019

Вы можете просто отфильтровать события во время обработки на основе timeStamp или time или любого поля. Например, давайте предположим, что ваше событие - JSON, и у него есть поле с именем hour, которое является значением часа времени события. Вы можете легко выбрать только событие, которое было создано до 6, как показано ниже.

directStream.foreachRDD { rdd =>
        val eventDfRDD = rdd.filter(record => {
          val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
          option.get("hour") < 1800
        })
      }
...