Потоки Кафки - Агрегация со старым состоянием - PullRequest
0 голосов
/ 25 апреля 2018

У меня есть KStream с данными из темы to1 , как это:

T1-KEY -> {T1}
T2-KEY -> {T2}

и KTable , сконструированные следующим образом:

Я использую org.apache.kafka.streams.StreamsBuilder , чтобы создать KTable из некоторой темы to2 , которая выглядит следующим образом:

A1-KEY -> { "A1", "Set": [
                          {"B1", "Rel": "T1"},
                          {"B2", "Rel": "T1"}
                         ]
          } 

..

Затем поток выводится на карту и группируется по Key s.t. результирующий KTable выглядит так:

T1 -> { ["B1", "B2"] }

Позже, теперь в теме появляется следующее сообщение to2 :

A1-KEY -> { "A1", "Set": [
                          {"B2", "Rel": "T1"}
                         ]
          } 

Теперь я ожидаю, что моя KTable отразит изменения и будет выглядеть следующим образом:

T1 -> { ["B2"] }

но это выглядит так:

T1 -> { ["B1", "B2"] }

Я заметил, что в моем Aggregator<Tx-KEY, Bx, Set<Bx>> последним аргументом является набор ["B1", "B2"], хотя, когда я смотрю перед агрегацией, я получаю только одно совпадение "B2".

Я неправильно понимаю совокупность или что здесь происходит?

EDIT

Думаю, я сузил это: очевидно, Initializer агрегата вызывается только для очень в первый раз - после этого агрегат всегда получает last aggregate в качестве последнего аргумента, например,

@Override
public Set<Bx> apply(Tx-KEY, Bx value, Set<Bx> aggregate) {

}

, где Set<Bx> aggregate - это [] для самого первого вызова (созданного с помощью инициализатора), но ["B1", "B2"] для второго вызова.

Есть идеи?

РЕДАКТИРОВАТЬ 2

public class MyAggregator implements Aggregator<Tx-KEY, Bx, Set<Bx>> {

    @Override
    public Set<Bx> apply(Tx-KEY key, Bx value, Set<Bx> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}

РЕДАКТИРОВАТЬ 3

Я не могу использовать только плоскую карту, так как мне нужно объединить несколько элементов Ax, например,

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          },
A2-KEY -> { "A2", "Set": [
                      {"B2", "Rel": "T1"}
                     ]
          },
...

где я тогда ожидаю какую-то группу, например

T1 -> { ["B1", "B2"] }

и в следующей итерации, когда сообщение

A1-KEY -> { "A1", "Set": [
                      {"B1", "Rel": "T1"}
                     ]
          }

прибывает, я ожидал

T1 -> { ["B1"] }

..

1 Ответ

0 голосов
/ 25 апреля 2018

Обратите внимание, что в агрегаторе вы только добавляете элементы в агрегатный набор.С этой логикой ваш набор (для данного ключа) никогда не может сжиматься.Я думаю, что в этом случае вы слишком сильно сгладили поток.Я предлагаю вам не выравнивать его так, чтобы ваши сообщения имели форму (Tx-KEY key, Bx value), а вместо этого, чтобы они всегда сохраняли свою установленную форму: (Tx-KEY key, Set<Bx> value).Вам не нужно агрегирование тогда вообще.Для этого я предлагаю вам преобразовать входной набор

"Set": [
     {"B1", "Rel": "T1"},
     {"B2", "Rel": "T1"}
]

в

T1 -> { ["B1", "B2"] }

, сгруппировав его по полю "Rel", используя стандартный код Java (Collections или Streams api) внутри KStreamВызов метода flatmap для того, чтобы вы когда-либо отправляли в KStream только сообщения со значениями Set<Bx> -типов, а не Bx -типами по отдельности.

С удовольствием уточним, если вы предоставите код для вашей текущей плоской картыосуществление.

...