Как понять агрегацию потоков кафки? - PullRequest
0 голосов
/ 07 октября 2019

Я новичок в kafka и изучаю его. Я просто работаю над агрегацией данных для сотрудников, но сталкиваюсь с проблемами. Может кто-нибудь, пожалуйста, помогите.

У меня есть тема timeoffs с ключом time_off_id и значением типа объекта, который также содержит идентификатор сотрудника. Поэтому я хочу построить магазин, в котором идентификатор сотрудника должен быть ключом, а значение должно быть списком выходных времени этого сотрудника. Но я придерживаюсь нижеприведенного подхода, но сталкиваюсь с проблемой. При агрегировании данных в методе указывается неверный тип возврата: невозможно преобразовать ArrayList в VR. Можете ли вы помочь мне.

Код:

KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
    (key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
  aggValue.add(newValue);
  return aggValue;
}, Materialized.as("NewStore"));

Я также попробовал этот подход, но опять-таки это не решило проблему.

Класс TimeOffList:

package com.kafka.productiontest.models;

import java.util.ArrayList;

public class TimeOffList {
  ArrayList list = new ArrayList<TimeOff>();

  public TimeOffList add(Object s) {
    list.add(s);
    return this;
  }
}

В классе потоковой передачи:

groupedTable.aggregate(TimeOffList::new,
    (k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));

После реализации вашего решения эта проблема исчезла, но теперь столкнулась с проблемой serde. Я реализовал TimeOffListSerde. Пожалуйста, проверьте ниже код

KStream<String, TimeOff> source = builder.stream(topic);
source.groupBy((k, v) -> v.getEmployeeId())
    .aggregate(ArrayList::new,
        (key, value, aggregate) -> {
          aggregate.add(value);
          return aggregate;
        }, Materialized.as("NewStore").withValueSerde(new TimeOffListSerde(TimeOff.class)));

TimeOffListSerde.java

package com.kafka.productiontest.models;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.ArrayList;
import java.util.Map;

public class TimeOffListSerde implements Serde<ArrayList<TimeOff>> {
  private Serde<ArrayList<TimeOff>> inner;

  public TimeOffListSerde() {
  }

  public TimeOffListSerde(Serde<TimeOff> serde){
    inner = Serdes.serdeFrom(new TimeOffListSerializer(serde.serializer()), new TimeOffListDeserializer(serde.deserializer()));
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.serializer().configure(configs, isKey);
    inner.deserializer().configure(configs, isKey);
  }

  @Override
  public void close() {
    inner.serializer().close();
    inner.deserializer().close();
  }

  @Override
  public Serializer<ArrayList<TimeOff>> serializer() {
    return inner.serializer();
  }

  @Override
  public Deserializer<ArrayList<TimeOff>> deserializer() {
    return inner.deserializer();
  }
}

1 Ответ

1 голос
/ 07 октября 2019

хотел бы ты этого?

KStream<String, TimeOff> source = builder.stream(sourceTopic);
KTable<String, List<TimeOff>> table = source.groupBy((k, v) -> v.getId())
    .aggregate(ArrayList::new,
            (key, value, aggregate) -> {
                aggregate.add(value);
                return aggregate;
            }, Materialized.as("NewStore"));
...