Подсчитывать сообщения из темы кафки весь час - PullRequest
0 голосов
/ 17 мая 2018

Я хотел бы посчитать сообщения, приходящие из темы какфа.

Например, у меня есть этот класс дел:

case class Message(timestamp: LocalDateTime)

Я получаю сообщения этого класса, и я хотел бы сосчитатьсколько сообщений у меня в течение 1 часа.Предположим, что в этой теме упорядочены сообщения (временная метка соответствует тому, когда сообщение вводится в теме).

Я хотел бы создать класс дел, подобный этому:

case class Counter(datetime: LocalDateTime, count: Int)

Позвольте мне сказатьу меня будет 100 сообщений в первый час, затем у меня будет 150:

Counter("2018-05-17 00:00:00", 100)
Counter("2018-05-17 00:01:00", 150)

Есть идеи, как это сделать?Для информации я не могу / не хочу использовать kafka-streams.

Спасибо!

РЕДАКТИРОВАТЬ:

Мой источник - тема kafka, которую я хотел быиспользовать с Consumer API.Моя раковина - стол postgresql.

Ответы [ 2 ]

0 голосов
/ 18 мая 2018

Я разобрался с решением с Flink.

Я прочитал некоторую документацию по временному окну во Флинке, и на этой странице говорится о восходящей отметке времени в теме (это мой случай).

Итак, вот решение:

  val inputStream: DataStream[Message] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[Message] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).sum(1)

Подсчитывает все элементы в падающем окне за 1 минуту.

Для более конкретного решения и получения Counter("2018-05-17 00:00:00", 100) мы должны расширить AllWindowFunction:

  class CustomWindowFunction extends AllWindowFunction[Message, Counter, TimeWindow] {
  def apply(window: TimeWindow, input: Iterable[Message], out: Collector[Counter]): Unit = {
    out.collect(
      Counter(
        new LocalDateTime(window.getStart),
        input.size
      )
    )
  }
}

А затем примените его к нашему timeStream:

  val inputStream: DataStream[MyClass] = env.addSource(kafkaConsumer)
  val timedStream: DataStream[MyClass] = inputStream
    .assignAscendingTimestamps(_.timestamp)
  val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).apply(new CustomWindowFunction())

Если в нашей теме ввода у нас есть класс Message, мы получаем класс Counter в конце.

Это "лучшее" решение, которое я нашел на данный момент.

0 голосов
/ 18 мая 2018

Требуемое решение обычно называется windowing в терминах потоковой обработки, и большинство библиотек потоковой обработки имеют это в качестве функции. хорошая запись от Software Mill, сравнивающая Spark Streaming, Flink, Kafka Streams и Akka Streams.

Вы можете попытаться реализовать это самостоятельно, но все библиотеки, упомянутые выше, протестированы в бою и имеют простые, читаемые API.Если вы не хотите использовать Kafka Streams, то стоит подумать о Akka Streams Kafka , упомянутом в одном из комментариев (часть проекта Alpakka ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...