Я новичок в Apache Kafka.Я прочитал код приложения Steam и наткнулся на операцию агрегации.Я пытаюсь понять это самостоятельно, и мне нужно подтверждение, если я правильно истолковываю.
Ниже приведен фрагмент кода для чтения из темы и агрегирования,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the groupping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
Поток данных в тему bank-transactions
создается следующим образом:
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
Начальный баланс инициируется следующим образом:
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
Метод newBalance
берет транзакцию и баланс и возвращает новый баланс,
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
У меня есть 2 вопроса огруппировка и агрегация
а.Является ли groupByKey
группировкой по Serdes.String()
, а jsonSerde
выполняет только сериализацию и десериализацию данных Steam?Serdes.String()
- это строка имени в методе newRandomTransaction
.
б.Мое утверждение заключается в том, что key, transaction
внутри функции aggregation
строки (key, transaction, balance) -> newBalance(transaction, balance)
читается из темы bank-transactions
, а balance
идет от initialBalance
из предыдущей строки.Это верно?
Я также озадачился, пытаясь отладить приложение, хотя оно работает без проблем.