Похоже, вы хотите поделиться некоторым кодом для обычной обработки с побочным эффектом и обработки на основе пунктуации без побочного эффекта.В противном случае вы не будете использовать MyTransformer
для обоих случаев?
Таким образом, мне интересно, могли бы вы сделать все за один Transformer
вместо трех.
MyTransformer<K,V,R> implement Transformer<K,V,R> {
public void init(ProcessorContext context) {
context.schedule(..., new MyPunctuator());
}
public R transform(K key, V value) {
// for every record from the source topic do everything
doSharedStuff();
doStuffWithSideEffect();
}
private doSharedStuff() {...}
private doStuffWithSideEffect() {...}
private class MyPunctuator implements Punctuator {
public void punctuate(long timestamp) {
for(KeyValue kv : ...) { // whatever k/v-pair to want to "forward"
// for every record you want to emit delayed, do only some part
doSharedStuff();
}
}
}
}