Накопление событий из нескольких тем с использованием потоков Kafka - PullRequest
0 голосов
/ 07 марта 2020

Я прошу прощения, если это глупый вопрос.

У меня есть сценарий, согласно которому у меня есть 3 темы из вышестоящей службы (которые не имеют ключа). К сожалению, я не могу изменить поведение 3 тем.

Основная часть службы верхнего уровня публикует все сообщения в конце дня, и мне нужно получить общее представление о транзакциях, так как заказ транзакций имеет значение для нисходящего сервиса.

Я понимаю, что не могу переупорядочить сообщения в разных разделах тем, поэтому я подумал, смогу ли я их накапливать, и тогда мой сервис мог бы взять накопленный результат и переупорядочьте их перед обработкой.

Однако я замечаю странное поведение и надеюсь, что кто-то сможет уточнить, что мне не хватает.

Когда я выполняю операцию с 1 для 500 учетных записей я вижу 500 сообщений, собранных и отображенных в выходных данных topi c.

Однако, когда я пытаюсь выполнить ту же операцию с 10 000 учетными записями, я вижу больше выходных данных, чем должно быть. (13 000 сообщений на выходе topi c).

    KStream<String, TransactionAccumulator> transactions =
        disbursements
            .merge(repayments)
            .merge(fees)
            .groupBy(
                (k, v) -> v.getAccountId(),
                with(
                    String(),
                    serdeFrom(
                        new JsonSerializer<>(mapper),
                        new JsonDeserializer<>(Transaction.class, mapper))))
            .windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
            .aggregate(
                TransactionAccumulator::new,
                (key, value, aggregate) -> aggregate.add(value),
                (aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
                Materialized.with(
                    String(),
                    serdeFrom(
                        new JsonSerializer<>(mapper),
                        new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
            .toStream((key, value) -> key.key());

Как указывалось ранее, основная часть вышестоящей службы публикует все события в конце дня (вместо реального времени).

Буду признателен за то, что мне здесь не хватает, так как для небольших томов, похоже, работает.


Обновление 1

Я попробовал предложенное использование suppression чтобы попытаться отправить только окончательное окно.

Однако, при использовании этого, он в основном не публикует sh никаких сообщений на выходе topi c, хотя я вижу, что в "KTABLE-SUPPRESS-STATE-STORE" * есть сообщения

Обновленный код с подавлением выглядит следующим образом.

   disbursements
        .merge(repayments)
        .merge(fees)
        .groupBy(
            (key, value) -> value.getAccountId(),
            Grouped.with(
                Serdes.String(),
                Serdes.serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(Transaction.class, mapper))))
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
        .aggregate(
            TransactionAccumulator::new,
            (key, value, aggregate) -> aggregate.add(value),
            Materialized.with(
                Serdes.String(),
                Serdes.serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
        .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
        .mapValues(
            value -> {
              LOGGER.info(
                  "Sending {} Transactions for {}",
                  value.getTransactions().size(),
                  value.getAccountId());
              return value;
            })
        .toStream((key, value) -> key.key());

Я также не вижу введенных сообщений журнала. Для ясности я использую Spring Cloud Stream в этом эксперименте, и последние записи журнала, которые я вижу в потоковом приложении, выглядят следующим образом.

INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode

1 Ответ

0 голосов
/ 08 марта 2020

Извините, я пока не могу комментировать, но вот мои два цента:

  1. KGroupedStream.aggregate(): Kafka Stream использует кэш записей для управления скоростью, с которой агрегированные обновления отправляются из материализованного представления (или таблицы KTable) aggregate в хранилище состояний и последующий процессор. Например, с сообщениями:
("word1", 4)
("word1", 2)
("word2", 3)
("word1", 1)

И ваша топология подсчета слов:

wordCntPerSentenceKStream
    .groupByKey()
    .aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer())
    .toStream();

вы можете получать последующие сообщения, подобные этому:

("word1", 6)
("word2", 3)
("word1", 7)

Так что мой Предполагается, что ваш ввод topi c может содержать несколько транзакций для одного AccountId, и кэш записей очищается, когда кэш (cache.max.bytes.buffering) заполнен или commit.interval.ms установлен.

Если ваш приемник идемпотентен, вы можете просто переопределить свой TransactionAccumulator с помощью нового ключа сообщения или можете использовать KTable.suppress(), как указано здесь , чтобы выдать только последнее сообщение в агрегированном окне.
...