Агрегат KStream в Скала - PullRequest
       57

Агрегат KStream в Скала

0 голосов
/ 15 октября 2018

Если у меня есть записи ниже

{"A",7}
{"B",10}
{"C",10}

Тогда агрегация должна быть

{"sum_ABC",27}

Затем, когда A изменяется на 10, т.е. добавляется новое сообщение

{"A",10}

Теперь он должен вычисляться как

{"sum_ABC",30}


val record: KTable[String, JsonNode] = builder.table("logs",m_consumed)
val aggVal: KTable[String, Double] = record.toStream().groupByKey()
        .reduce(new Reducer[Double]() {
        def apply(val1: Double, val2: Double): Double =
          {
            println(val1)
            val1 + val2
          }
      })

Это не работает. Он продолжает добавлять значения, а при перезапуске напрямую добавляет 0 + новое значение.До сих пор я понял, что мне нужно использовать агрегат, а не сокращение.Пожалуйста, руководство, как это сделать.Любые ссылки или учебник?

1 Ответ

0 голосов
/ 16 октября 2018

Агрегации всегда основаны на ключах.Таким образом, если вы хотите агрегировать сообщения с разными ключами (в вашем примере A,B,C), вам нужно установить новый ключ для всех сообщений, чтобы сопоставить их с одним ключом.

Кроме того, чтобы получить семантику обновления, вам нужно применить агрегацию на KTable.В вашем примере, чтобы преобразовать KTable в KStream (через toStream()) и, таким образом, изменить семантику с «обновлений» на «факты».

Вы должны использовать:

KTable record = ...
KTable result = record.groupBy(/* map records that you want to aggregate to the same key*/)
                      .reduce(...).
...