Создан поток из темы и успешно получены мои данные на основе фильтра после stream.start ().но моя проблема заключается в том, что, когда я пытаюсь снова выполнить фильтрацию ... нужно пройти, чтобы создать компоновщик и ksteram ... эти kStream будут искать данные от начального смещения темы 0 до конца и производить результат ... однако ... этот просмотрэта тема из 0 все мои просьбы не помогают получить соответствующий SLA ... есть ли способ, которым мы можем добиться, чтобы купить загрузку потока один раз и выполнить агрегацию, когда я захочу?
Я всегда запускаю и закрываю свой поток ...
, пожалуйста, помогите, как достичь, и любая примерная программа сделает
мой пример программы
public class EventStream{
public static void create stream(String[] args) {
// TODO Auto-generated method stub
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "event_stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Tracking> trackevent= build.stream("event",Consumed.with((serdes.String(),Serds.serderFrom(test.calss,test.class))));
trackevent.filter((k,v) -> "test".equals(v.getStr())).foreach((k,v) -> System.Out.Println(V));
trackevent.cleanUp();
trackevent.start;
}
}