Может ли использование changelogs стать узким местом для самого приложения? - PullRequest
0 голосов
/ 22 мая 2019

У меня есть приложение весенних облачных потоков kafka, которое перехватывает входящие данные, чтобы иметь возможность объединять две темы: ключи выбора, значения карт и агрегировать данные. Со временем потребительские лаги увеличиваются, и масштабирование за счет добавления нескольких экземпляров приложения мало помогает. С каждым разом потребительское отставание, похоже, увеличивается.

Я увеличил и уменьшил количество экземпляров с 1 до 18, но большой разницы не заметил. Количество сообщений, от которых оно отстает, продолжает увеличиваться каждые 5 секунд независимо от количества экземпляров

KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
                .flatMap(flattenOriginalData())
                .through("atl-mapped-original-sensor-data-repartition", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));


        //#2. Save modelid and algorithm parts of the key of the errorscore topic and reduce the key
        //    to installationId:assetId:tagName
        //Repartition ahead of time avoiding multiple repartition topics and thereby duplicating data
        KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
                .map(enrichWithModelAndAlgorithmAndReduceKey())
                .through("atl-mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));


        return enrichedErrorData
                //#3. Join
                .join(flattenedOriginalData, join(),
                        JoinWindows.of(
                                // allow messages within one second to be joined together based on their timestamp
                                Duration.ofMillis(1000).toMillis())
                                // configure the retention period of the local state store involved in this join
                                .until(Long.parseLong(retention)),
                        Joined.with(
                                Serdes.String(),
                                new MappedErrorScoreDataSerde(),
                                new MappedOriginalSensorDataSerde()))
                //#4. Set instalation:assetid:modelinstance:algorithm::tag key back
                .selectKey((k,v) -> v.getOriginalKey())
                //#5. Map to ErrorScore (basically removing the originalKey field)
                .mapValues(removeOriginalKeyField())
                .through("atl-joined-data-repartition");

затем часть агрегации:

        Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized
                .as(localStore.getStoreName());

        // Set retention of changelog topic
        materialized.withLoggingEnabled(topicConfig);

        // Configure how windows looks like and how long data will be retained in local stores
        TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
                localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS)));

        // Processing description:
        // 2. With the groupByKey we group  the data on the new key
        // 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum
        // 4. With reduce we determine the maximum value in the time window
        // 5. Materialized will make it stored in a table
        stream.groupByKey()
        .windowedBy(configuredTimeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized);
    }

    private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) {
        TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
        timeWindows.until(retentionMs);
        return timeWindows;
    }

Я ожидаю, что увеличение количества экземпляров значительно уменьшит отставание потребителей.

Так что в этой настройке участвуют несколько тем, таких как: * оригинальные данные датчика * оценка ошибки * kstream-joinother * kstream-jointhis * atl-mapped-original-sensor-data-repartition * atl-mapped-error-score-data-repartition * atl-join-data-repartition

Идея состоит в том, чтобы объединить исходные данные датчика с оценкой ошибки. Для перебора требуется тема atl-mapped- *. затем объединение будет использовать темы kstream *, и в результате в результате объединения заполняется atl-join-data-repartition. После этого агрегация также создает темы, но я оставляю это вне области видимости.

original-sensor-data 
\
 \
  \   atl-mapped-original-sensor-data-repartition-- kstream-jointhis -\
  /   atl-mapped-error-score-data-repartition    -- kstream-joinother -\
 /                                                                      \ 
error-score                                          atl-joined-data-repartition 

Поскольку кажется, что увеличение количества экземпляров больше не оказывает большого влияния, так как я представил темы объединения и сопоставления atl, мне интересно, возможно ли, чтобы эта топология стала собственным узким местом? , Из потребительского лага кажется, что у темы "Исходные данные датчика" и "Оценка ошибок" гораздо меньше потребительского лага по сравнению, например, с темами atl-mapped- *. Есть ли способ справиться с этим, удалив эти изменения, или это приводит к невозможности масштабирования?

...