На данный момент лучший рабочий подход, который я нашел, - это использовать смещения Кафки из выходных записей.
Подход можно обобщить так:
- Выполните всю логику, которую вы хотите сделать, и не беспокойтесь о нескольких записях для одного и того же ключа.
- Запишите результаты в промежуточную тему с минимальным удержанием (1 час и т. Д.)
- Прочтите промежуточные разделы, используя Процессор и в Процессоре, обогатите сообщение с помощью Смещения Кафки, используя
context.offset()
.
- Запишите сообщения в тему вывода.
Теперь ваша тема вывода содержит несколько сообщений для одного и того же ключа, но каждое с различным смещением.
Теперь во время запроса вы можете выбрать максимальное смещение для каждой клавиши, используя подзапрос.
Пример TransformerSupplier можно увидеть ниже
/**
* @param <K> key type
* @param <V> value type
*/
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
@Override
public Transformer<K, V, KeyValue<String, String>> get() {
return new OutputTransformer<>();
}
private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
/**
* @param key the key for the record
* @param value the value for the record
*/
@Override
public KeyValue<String, String> transform(K key, V value) {
if (value != null) {
value.setKafkaOffset(context.offset());
}
return new KeyValue<>(key, value);
}
@Override
public KeyValue<String, String> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
// nothing to close
}
}
}