KafkaStream KTable dump - PullRequest
       2

KafkaStream KTable dump

0 голосов
/ 27 января 2019

Мой вопрос касается выгрузки KTable, значения которого соответствуют определенным критериям при получении триггерного сообщения.

Вот пример этой проблемы:

KTable - CurrentAccountBalance

   John    +10,
   Joe     -1,
   Alice   -2,
   Jill    +5,

Мое требование - извлечь все записи, имеющие отрицательный баланс для входящего события: FETCH_NEGATIVE_BALANCE_ENTRIES, которое поступает в другой поток команд.

Я думал так: если мы сделаем leftJoin для CurrentAccountBalance KTable с потоком команд мы можем сбросить все записи CurrentAccountBalance (которые могут быть использованы для фильтрации), но этого не происходит.

ValueJoiner из leftJoin получает только команду справа и ноль слева (вместо всех записей CurrentAccountBalance).Я что-то упустил?

спасибо

Ответы [ 2 ]

0 голосов
/ 28 января 2019

Вот решение, которое я реализовал для вывода записей KTable, соответствующих определенным критериям.Описать его здесь для тех, кто ищет подобное решение в будущем.

  1. Создан пользовательский преобразователь (FlatMapper) для генерации сообщений, удовлетворяющих условию.Я передал хранилище состояний в качестве аргумента конструктора этому преобразователю.
  2. Создал событие с именем «FLUSH_NEGATIVE_BALANCES» и прочитал его в новом KStream с именем commandStream
  3. Когда получена эта новая команда FLUSHЯ вызываю flatmapper, созданный на шаге 1, для генерации сообщений, удовлетворяющих условию (отрицательные сальдо в моем случае)
  4. Я записываю вывод flatmapper в поток приемника.

    / *Вот код для flatmapper, который является Шагом 1 выше описания * /

    открытый класс NegativeBalanceMapper реализует KeyValueMapper >> {private final BusinessRuleValidator businessRuleValidator;private final StoreHolder storeHolder;

    public NegativeBalanceMapper(StoreHolder storeHolder, BusinessRuleValidator businessRuleValidator)
    {
        this.storeHolder = storeHolder;
        this.businessRuleValidator = businessRuleValidator;
    }
    
    public Iterable<KeyValue<String, BalanceEntry>> apply(String key, CommandEntry value)
    {
        List<KeyValue<String, BalanceEntry>> result = new ArrayList<>();
        if (value == null)
        {
            return result;
        }
    
        final ReadOnlyKeyValueStore<String, BalanceEntry> ledgerStore = storeHolder.getLedgerStore();
        if (ledgerStore != null)
        {
            KeyValueIterator<String, BalanceEntry> range = ledgerStore.all();
            while (range.hasNext())
            {
                KeyValue<String, BalanceEntry> next = range.next();
                if (businessRuleValidator.isNegativeBalance(next.value))
                {
                    result.add(new KeyValue<>(next.key, next.value));
                }
            }    
        }
        return result;
    }
    

    }

    / Вот как она вызывается при получении команды FLUSH-шаг2 выше / конечный KStream негативные баллы = FilterCommandStream.flatMap (absoluteBalanceMapper);

    / Здесь я выталкиваю его на уровень приемника-4 выше /

    absoluteBalances.to (KafkaConstants.TOPIC_NEGATIVE_BALANCES, Produced.with (Serdes).String (), serdeRegistry.getBalanceEntrySerde ()));

Спасибо Матиасу за руководство.

0 голосов
/ 27 января 2019

Одно входное сообщение в соединении KTable создаст одно (или ноль) выходных сообщений в Kafka Stream, поскольку объединения KTable основаны на первичных ключах.

Что вы могли бы сделать, это создать пользовательский transformer, который получил команды и который связан с состоянием KTable (если вы задаете имя хранилища KTable, вы можете легко подключить состояние к преобразователю). Когда команда получена, вы можете получить итератор state.all() и найти все записи, которые соответствуют вашему условию, и выдать их через context.forward() плюс приемник, который находится ниже по потоку от преобразователя.

...