Как работает эта агрегация в потоке Кафки? - PullRequest
0 голосов
/ 06 октября 2018

Я новичок в Apache Kafka.Я прочитал код приложения Steam и наткнулся на операцию агрегации.Я пытаюсь понять это самостоятельно, и мне нужно подтверждение, если я правильно истолковываю.

Ниже приведен фрагмент кода для чтения из темы и агрегирования,

// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


KStreamBuilder builder = new KStreamBuilder();

// read from the topic 'bank-transactions' as `KStream`. I provided the producer below 
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

// we define the groupping and aggregation here 
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
    .aggregate(
            () -> initialBalance,
            (key, transaction, balance) -> newBalance(transaction, balance),
            jsonSerde,
            "bank-balance-agg"
    );

Поток данных в тему bank-transactions создается следующим образом:

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time using Java 8
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());

    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}

Начальный баланс инициируется следующим образом:

// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();

initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

Метод newBalance берет транзакцию и баланс и возвращает новый баланс,

private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
    // create a new balance json object
    ObjectNode newBalance = JsonNodeFactory.instance.objectNode();

    newBalance.put("count", balance.get("count").asInt() + 1);
    newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

    Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
    Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();

    Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
    newBalance.put("time", newBalanceInstant.toString());

    return newBalance;
}

У меня есть 2 вопроса огруппировка и агрегация

а.Является ли groupByKey группировкой по Serdes.String(), а jsonSerde выполняет только сериализацию и десериализацию данных Steam?Serdes.String() - это строка имени в методе newRandomTransaction.

б.Мое утверждение заключается в том, что key, transaction внутри функции aggregation строки (key, transaction, balance) -> newBalance(transaction, balance) читается из темы bank-transactions, а balance идет от initialBalance из предыдущей строки.Это верно?

Я также озадачился, пытаясь отладить приложение, хотя оно работает без проблем.

1 Ответ

0 голосов
/ 06 октября 2018

Является ли groupByKey группированием по Serdes.String (), а jsonSerde выполняет только сериализацию и десериализацию для данных steam?

Да, groupByKey группирует по ключам, которые могут быть десериализованы и сопоставлены как строкипрочитано в теме банковских операций, и остаток поступает из initialBalance из предыдущей строки

Почти.Инициализатор находится по первому параметру, да, но агрегированный результат переносится на протяжении всего выполнения приложения, бесконечно агрегируя.

Другими словами, вы всегда начинаете с initialBalance, затем для каждого одинакового ключа вы добавляете баланс transaction к текущему накопленному balance для этого ключа.Если вы еще не видели, чтобы ключ повторялся, только тогда он будет добавлен к начальному балансу

И да, ваша тема ввода была определена методом KStreams builder.stream

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...