Агрегирование данных в реальном времени в KStreams - PullRequest
0 голосов
/ 10 октября 2018

Я хочу суммировать данные одного столбца на основе указанного ключа.Поток похож на id (String) Ключ, значение (Long).

val aggtimelogs: KTable[String, java.lang.Long] = stream
      .groupByKey()
      .aggregate(
               () => 0L,
              (key: String, value: java.lang.Long, aggregate: java.lang.Long) => value + aggregate)

// Неудача здесь

Получение

Unspecified value parameters: : Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]

Как это сделать в Scala?

Кафка версия

compile "org.apache.kafka:kafka-clients:2.0.0"
compile (group: "org.apache.kafka", name: "kafka-streams", version: "2.0.0"){
    exclude group:"com.fasterxml.jackson.core"
}

Даже я пробовал это

val reducer = new Reducer[java.lang.Long]() {
      def apply(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
    }

    val agg = stream
          .groupByKey()
          .reduce(reducer)

Также этот

val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2

Говорит

StreamAggregation.scala:39: type mismatch;
 found   : (Long, Long) => Long
 required: org.apache.kafka.streams.kstream.Reducer[Long]
    val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2

1 Ответ

0 голосов
/ 10 октября 2018

Я сделал это так

val aggVal = streams.groupByKey().reduce(new Reducer[Double]() {
    def apply(val1: Double, val2: Double): Double = val1 + val2
  })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...