Требуется неожиданно много времени для преобразования сообщения в приложении kstreams. - PullRequest
1 голос
/ 10 октября 2019

У меня есть довольно простое приложение kstreams, в котором я разбираю сообщения в течение нескольких секунд, а затем использую transform, чтобы удалить или сохранить сообщение в хранилище состояний. У меня также есть метод пунктуации, который запускается каждые 30 секунд, чтобы пройти через хранилище и выдать сообщения.

Я обнаружил, что с момента, когда мое приложение получает сообщение, до момента, когда оно поступает вФункция преобразования занимает намного больше времени, чем я ожидала (я предполагаю, что функция преобразования происходит довольно быстро после истечения срока действия окна). Это не совсем проблема для моего варианта использования, но мне любопытно, что может потребоваться так много времени, чтобы перейти к функции преобразования.

    final StreamsBuilder builder = new StreamsBuilder();
    final StoreBuilder<KeyValueStore<String, Payload>> store = Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore(keyValueStoreName),
            Serdes.String(),
            avroSerde
    );
    builder.addStateStore(store);

    final Consumed<String, Payload> consumed = Consumed.with(Serdes.String(), avroSerde)
            .withTimestampExtractor(new WallclockTimestampExtractor());
    final Produced<String, Payload> produced = Produced.with(Serdes.String(), avroSerde);
    final KStream<String, Payload> stream = builder.stream(inputTopic, consumed);
    final SessionWindows sessionWindows = SessionWindows
            .with(Duration.ofSeconds(2));
    final SessionWindowTransformerSupplier transformerSupplier =
            new SessionWindowTransformerSupplier(keyValueStoreName, scheduleTimeSeconds);
    final SessionBytesStoreSupplier sessionBytesStoreSupplier = Stores.persistentSessionStore(
            "debounce-window",
            Duration.ofSeconds(3));
    final Materialized<String, Payload, SessionStore<Bytes, byte[]>> materializedAs =
            Materialized.as(sessionBytesStoreSupplier);

    stream
            .selectKey((key, value) -> {
                logger.info("selecting key: " + key);
                return key;
            })
            .groupByKey()
            .windowedBy(sessionWindows)
            .reduce(payloadDebounceFunction::apply, materializedAs)
            .toStream()
            .transform(transformerSupplier, keyValueStoreName)
            .to(outputTopic, produced);

    return builder;

Вот мой метод преобразования / пунктуации:

@Override
public void init(ProcessorContext context) {
    this.processorContext = context;
    this.store = (KeyValueStore<String, Payload>) context.getStateStore(keyValueStoreName);
    context.schedule(ofSeconds(scheduleTime), WALL_CLOCK_TIME, timestamp -> punctuate());
}

@Override
public KeyValue<String, Payload> transform(Windowed<String> key, Payload value) {
    synchronized (this) {
        if(value != null) {
            BatchScanStatus status = extractStatus(value);
            boolean removeFromStoreStatus = BatchScanStatus.CANCELLED.equals(status)
                    || BatchScanStatus.FINALIZING.equals(status);

            if(removeFromStoreStatus) {
                logger.info("Deleting key from store: {}", key);
                store.delete(key.key());
            } else {
                logger.info("Adding key to store: {}", key);
                store.putIfAbsent(key.key(), value);
            }
            processorContext.commit();
        }
        return null;
    }
}

private void punctuate() {
    synchronized (this) {
        final KeyValueIterator<String, Payload> keyIter = store.all();
        while(keyIter.hasNext()) {
            final KeyValue<String, Payload> record = keyIter.next();
            logger.info("Forwarding key: {}", record.key);
            processorContext.forward(record.key, record.value);
        }

        keyIter.close();
    }
}

Время, которое требуется для перехода от функции selectKey к функции преобразования, сбивает меня с толку, поскольку в этом запуске потребовалось ~ 24 секунды

15:58:35.238 [scheduler-79112bd0-2310-482e-9aab-8bcaae746082-StreamThread-1] INFO  c.b.d.f.s.kstreams.Scheduler - selecting key: keykeykey
15:58:59.181 [scheduler-79112bd0-2310-482e-9aab-8bcaae746082-StreamThread-1] INFO  c.b.d.f.s.k.s.SessionTransformer - Adding key to store: [keykeykey@1570737515238/1570737515238]

Делает ли kstream больше работы, чем кажетсяздесь для того, чтобы что-то подобное заняло столько времени, сколько это занимает? Надеясь получить некоторое представление о том, является ли это проблемой конфигурации / синхронизации или это нормальное поведение для приложения kstreams.

РЕДАКТИРОВАТЬ: я думаю, что я нашел, где я ошибся, и это связано сзначение по умолчанию commit.interval.ms.

Изменения не были зафиксированы во внутренней теме, пока она не проверена, и, таким образом, моя функция преобразования не сработает, пока эти изменения не поступят во внутреннюю тему. Я сократил это до секунды и сразу увидел разницу.

...