Фильтрация с отслеживанием состояния / flatMapValues ​​в потоках Кафки? - PullRequest
0 голосов
/ 15 апреля 2019

Я пытаюсь написать простое приложение Kafka Streams (нацеленное на Kafka 2.2 / Confluent 5.2), чтобы преобразовать тему ввода с семантикой хотя бы один раз в поток вывода ровно один раз.Я хотел бы закодировать следующую логику:

  • Для каждого сообщения с заданным ключом:
    • Считать метку времени сообщения из строкового поля в значении сообщения
    • Получить наибольшую временную метку, которую мы ранее видели для этого ключа, из локального хранилища состояний
      • Если временная метка сообщения меньше или равна отметке времени в хранилище состояний, ничего не генерировать
      • Если отметка времени больше, чем отметка времени в хранилище состояний, или ключ не существует в хранилище состояний, отправьте сообщение и обновите хранилище состояний с помощью ключа / отметки времени сообщения

(Это гарантированно даст правильные результаты, основанные на гарантиях упорядочения, которые мы получаем от вышестоящей системы; я не пытаюсь здесь что-либо сделать магическим.)

Сначала я подумал, что мог бы сделать это с оператором Kafka Streams flatMapValues , который позволяет сопоставить каждое входное сообщение с нулем или несколькими выходными сообщениями с помощью одной и той же клавиши.Однако эта документация явно предупреждает:

Это операция записи без записи (см. TransformValues ​​(ValueTransformerSupplier, String ...) для преобразования значений с состоянием).

Звучит многообещающе, но в документации transformValues не ясно, как выдавать ноль или одно выходное сообщение на входное сообщение.Если только это не пытается сказать // or null в этом примере?

flatTransform также выглядело несколько многообещающе, но мне не нужно манипулировать ключом, и, если возможно, я бы хотел избежатьперераспределение.

Кто-нибудь знает, как правильно выполнять этот вид фильтрации?

1 Ответ

2 голосов
/ 15 апреля 2019

вы можете использовать Transformer для реализации операций с состоянием, как вы описали выше. Чтобы не распространять сообщение в нисходящем направлении, вам необходимо вернуть null из метода transform, это упоминается в Transformer java doc. И вы можете управлять распространением через processorContext.forward(key, value). Упрощенный пример приведен ниже

kStream.transform(() -> new DemoTransformer(stateStoreName), stateStoreName)

public class DemoTransformer implements Transformer<String, String, KeyValue<String, String>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, String> keyValueStore;

    public DemoTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
    }

    @Override
    public KeyValue<String, String> transform(String key, String value) {
        String existingValue = keyValueStore.get(key);
        if (/* your condition */) {
            processorContext.forward(key, value);
            keyValueStore.put(key, value);
        }

        return null;
    }

    @Override
    public void close() {
    }
}
...