как повторно использовать поток кафки, чтобы не открывать поток снова и снова - PullRequest
1 голос
/ 21 марта 2019

Создан поток из темы и успешно получены мои данные на основе фильтра после stream.start ().но моя проблема заключается в том, что, когда я пытаюсь снова выполнить фильтрацию ... нужно пройти, чтобы создать компоновщик и ksteram ... эти kStream будут искать данные от начального смещения темы 0 до конца и производить результат ... однако ... этот просмотрэта тема из 0 все мои просьбы не помогают получить соответствующий SLA ... есть ли способ, которым мы можем добиться, чтобы купить загрузку потока один раз и выполнить агрегацию, когда я захочу?

Я всегда запускаю и закрываю свой поток ...

, пожалуйста, помогите, как достичь, и любая примерная программа сделает

мой пример программы

  public class EventStream{
    public static void create stream(String[] args) {
            // TODO Auto-generated method stub
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "event_stream");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, Tracking> trackevent= build.stream("event",Consumed.with((serdes.String(),Serds.serderFrom(test.calss,test.class))));
            trackevent.filter((k,v) -> "test".equals(v.getStr())).foreach((k,v) -> System.Out.Println(V));
            trackevent.cleanUp();
            trackevent.start;

        }

    }
...