Горячий, чтобы создать разницу Stream, используя Kafka Streams в Java? - PullRequest
0 голосов
/ 03 июля 2019

Я пытаюсь создать поток «Разница» из KStream в Kafka Java.

У меня есть входной поток, где значения представляют собой набор значений Double V0… Vn. Выходной поток должен вычислять разницу между V0 - 0, V1 - V0, V2 - V1… Vn –Vn-1.

Моей первой идеей было сделать что-то вроде этого:

    KStream<String, Double> stream = builder.stream(TOPIC)

    KTable<String, Double> difference = stream.groupByKey().reduce(
            (oldValue, newValue) -> {
              return newValue - oldValue
            }
    ).toStream()

Допустим, у меня есть вход KStream со следующими значениями:

Key  -> Value
"A1" -> 2 
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13 
"A1" -> 7

Я хотел бы создать новый поток вывода со следующими значениями:

Key  -> Value
"A1" ->  2  (2-0  =  2) 
"B2" ->  4  (4-0  =  4)
"A1" ->  4  (6-2  =  4)
"A1" ->  4  (10-6 =  4)
"B2" ->  9  (13-4 =  9)
"A1" -> -3  (7-10 = -3)

1 Ответ

0 голосов
/ 03 июля 2019

Вы можете использовать что-то вроде

        stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {

        @Override
        public Diff apply(String key, Double newValue, Diff aggregate) {
            Double difference = newValue - aggregate.getLastValue();
            aggregate.setDifference(difference);
            aggregate.setLastValue(newValue);
            return aggregate;
        }
        }).mapValues(new ValueMapper<Diff, Double>() {

        @Override
        public Double apply(Diff value) {
            return value.getDifference();
        }

    }).toStream().to("diff");

где

public class Diff {

  private Double lastValue = 0d;

  private Double difference = 0d;
  //getters and setters
  // ...
}
...