Что происходит, когда GlobalKTable загружается из темы с одинаковым ключом в разных темах? - PullRequest
0 голосов
/ 15 мая 2019

У нас была уплотненная тема с одним разделом, и мы добавили в нее один новый раздел.

Мы не переразбили существующие данные - это означает, что события, загруженные до добавления новых разделов, все еще находятся в разделе 0. И новые события хранятся в соответствии со стандартной политикой, как и ожидалось: все события с одним и тем же ключом в одном разделе.

В данный момент мы находимся в таком случае:

Partition    Offset    Timestamp      Key         Value
0            586       1545388284240  COD_ISIN    AAA
1            983       1551800369978  COD_ISIN    BBB
1            1141      1556526044144  COD_ISIN    CCC

Когда я загружаю эту тему в GlobalKTable, значение в магазине равно AAA. И мы, очевидно, ожидали, что текущее значение будет CCC.

GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed,  Materialized.as(STORE_DATACATALOG));

KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);

inEvent = inEvent.transform(
    new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {

        @Override
        public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {

            return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {

                private ProcessorContext context;
                private KeyValueStore<String, JsonNode> dataCatalogueState;

                @Override
                public void init(ProcessorContext context) {

                    this.context = context;
                    this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);

                    LOGGER.debug("Content of dataCatalogueState: ");
                    KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();

                    JsonNode valueForIsin = null;

                    while (allDc.hasNext()) {
                        try {
                            KeyValue<String, JsonNode> next = allDc.next();
                            LOGGER.debug(" | " + next.key + " : " + next.value);
                            if ("COD_ISIN".equals(next.key)) 
                                valueForIsin = next.value;
                        } catch (Exception e) {
                            LOGGER.debug("exc" , e.getMessage());
                        }
                    }
                    LOGGER.info(" COD_ISIN ---> " + valueForIsin);
                }

                @Override
                public void close() {
                }

                @Override
                public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
                    return new KeyValue<>(key, value);
                }

                //@Override
                public KeyValue<String, JsonNode> punctuate(long timestamp) {
                    // TODO Auto-generated method stub
                    return null;
                }
            };
        }
    }
)

Как GlobalKTable строит свое состояние? Это на Offset или на Timestamp? Прикрепляет ли он ключ к первому разделу, где он найден?

Я знаю, как обойти (очистить тему и заполнить заново - стратегия разделения будет применена). Но мне любопытно, как это работает внутри.

1 Ответ

1 голос
/ 19 мая 2019

GlobalKTable предполагается, что данные разделены по ключу.Следовательно, если у вас есть записи с одним и тем же ключом в разных разделах, нет гарантии, что запись заказа будет применена.Заказ гарантирован только на раздел.Кроме того, атм, обновления основаны на смещениях только внутри раздела.

Используя ваш приведенный выше пример, порядок может быть

  • AAA, BBB, CCC
  • BBB, AAA, CCC
  • BBB, CCC, AAA

Гарантируется, что BBB будет применен до CCC.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...