Kafka Streams собирает элементы потоков в списки - PullRequest
0 голосов
/ 26 апреля 2018

Я хочу частично собирать элементы в списки в потоках Кафки через определенный промежуток времени или после обработки 1000 элементов.

KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> kStream = builder.stream(topicTwo);
        KTable<String, String> kTable = builder.table(topicOne);

        kStream.join(kTable,
                (streamValue, tableValue) -> new CustomObject(streamValue, tableValue)
                .foreach((key, value) -> System.out.println(value));

        KafkaStreams streams = new KafkaStreams(builder, streamProperties);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Это мой код.Я не знаю, достаточно ли я прояснил себя, но я хочу получить List<CustomObject> после каждых 1000 обработанных элементов или 5 секунд.Возможно ли это?

1 Ответ

0 голосов
/ 26 апреля 2018

Для этого я считаю, что вам нужно определить пользовательский Трансформатор .

В методе transform добавьте сообщения в список. Если размер списка достигает 1000 элементов, создайте новый список и верните старый.

В своем методе init schedule функция пунктуации в ProcessorContext, которая генерирует список элементов, которые старше вашего временного окна.

Используйте метод transform в KStream после объединения, чтобы добавить собственный преобразователь в топологию.

В вашем преобразователе, вероятно, лучше хранить буферизованные элементы в StateStore , а не в списке в памяти, чтобы гарантировать, что никакие сообщения не будут потеряны при отработке отказа / перебалансировке. Государственные магазины находятся в Кафке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...