Получение неопределенного значения параметра scala Kafka - PullRequest
0 голосов
/ 08 сентября 2018

Привет, я получаю эту ошибку компиляции -

Unspecified Value parameters: aggregator: (String, String, NotInferedVR) => NotInferedVR

Но у меня уже есть агрегатор. Кто-нибудь знает, что происходит?

 val stream = builder
    .stream(inputTopic)(Consumed.`with`(Serdes.String(), Serdes.ByteArray()))
    .map{ (key: String, value: Array[Byte]) =>
      println(s"key = ${key}")
      val lv = GroupByAction.convertByteArrayToJsonObject(value)
      val lst = List.empty[String]
      val newKey = GroupByAction
        .reKey(lv
          , groupByColumnList
            .asScala
            .toList
          ,lst)
      val newValue = getValFromJSONMessage(lv, aggregateColumnList.asScala.toList.head)
      println(s"newKey = ${newKey}")
      (newKey, newValue)}
    .groupByKey(Serialized.`with`(Serdes.String(), Serdes.String()))
    .aggregate{ () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString }

1 Ответ

0 голосов
/ 08 сентября 2018

Вы можете использовать только форму вызова метода {} для передачи одного параметра, который рассматривается как блок. Вам нужно передать два параметра, поэтому используйте () вместо:

aggregate( () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString )
...