Вычисление среднего с использованием ValueTransformerWithKey Или я могу использовать функции агрегации Кафки - PullRequest
0 голосов
/ 25 июня 2019

У меня есть поток объектов, где я хотел бы рассчитать среднее значение поля в этом объекте, а затем сохранить это среднее обратно на объект. Я бы хотел, чтобы у меня было 5 минут падающего окна с задержкой 1 час. Я новичок в Kafka, поэтому мне интересно, если это правильный способ решения проблемы.

Сначала я создаю постоянное хранилище:

StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
        Serdes.String(),
        Serdes.Double());

streamsBuilder.addStateStore(averagesStoreSupplier);

Затем я вызываю мой трансформатор, используя:

otherKTable
    .leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME), 
            myValueJoiner)
    .to("outputTopic")

А вот и мой трансформатор:

public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {

    private final String stateStoreName;

    public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    public ValueTransformerWithKey<String, MyObject, MyObject> get() {
        return new ValueTransformerWithKey<>() {

            private WindowStore<String, Double> averagesStore;

            @Override
            public void init(ProcessorContext processorContext) {
                averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
            }

            @Override
            public MyObject transform(String s, MyObject myObject) {
                if (averagesStore != null) {
                    averagesStore.put(s, myObject.getNumber());

                    Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
                    Instant timeTo = Instant.now();
                    WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);

                    double sum = 0.0;
                    int size = 0;
                    while(itr.hasNext()) {
                        KeyValue<Long, Double> next = itr.next();
                        size++;
                        sum += next.value;
                    }

                    myObject.setNumber(sum / size);

                }

                return myObject;
            }

            @Override
            public void close() {
                if (averagesStore != null) {
                    averagesStore.flush();
                }
            }
        };
    }
}

У меня есть пара вопросов. Во-первых, является ли способ, которым я определяю WindowStore, правильный способ формирования акробатического окна? Как бы я создал скачкообразное окно?

Во-вторых, внутри моего трансформера я получаю все предметы из магазина с начала времен до наших дней. Поскольку я определил его как 5-минутное окно и 1-часовое время хранения, означает ли это, что товары в магазине представляют собой снимок данных за 5 минут? Что здесь делает удержание?

У меня это работает над тривиальными случаями, но я не уверен, есть ли лучший способ сделать это с помощью агрегаций и объединений, или даже если я делаю это правильно. Кроме того, мне пришлось окружить извлечение получения хранилища попыткой catch, потому что инициализация вызывается несколько раз, и иногда я получаю Processor has no access to StateStore исключение.

1 Ответ

0 голосов
/ 04 июля 2019

Я бы рекомендовал использовать DSL вместо Processor API для этого варианта использования. Ср https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns для деталей.

У меня есть пара вопросов. Во-первых, является ли способ, которым я определяю WindowStore, правильный способ формирования акробатического окна? Как бы я создал скачкообразное окно?

Оконное хранилище можно использовать как для скачкообразного, так и для падающего окна - это зависит от того, как вы используете в своем процессоре, а не от того, как вы создаете хранилище, какую семантику окна вы получаете.

Во-вторых, внутри моего трансформера я получаю все предметы из магазина с начала времен до наших дней. Поскольку я определил его как 5-минутное окно и 1-часовое время хранения, означает ли это, что товары в магазине представляют собой снимок данных за 5 минут? Что здесь делает удержание?

Параметр windowSize при создании магазина работает не так, как вы ожидаете. Вам нужно было бы вручную закодировать логику управления окнами в своем коде Transformer, используя put(key, value, windowStartTimestamp) - atm, вы используете put(key, value), который использует context.timestamp(), то есть метку времени текущей записи, как windowStartTimestamp - я сомневаюсь это то, что вы хотите. Время хранения зависит от временных отметок окна, т. Е. Старые окна будут удалены после истечения срока их действия.

...