Kafka Stream для сортировки сообщений по ключу метки времени в сообщении json - PullRequest
0 голосов
/ 13 июня 2018

Я публикую Kafka с сообщениями JSON, например:

"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12

Я хочу отсортировать сообщения на основе UpdateTime в 10-секундном временном окне с использованием потоков Kafka и отодвинуть отсортированные сообщения в другой теме Kafka,Я создал поток, который считывает данные из входной темы, а затем я создаю TimeWindowedKStream после groupByKey(), где UserID является ключом в сообщении (хотя не обязательно groupByKey, а затем сортировать, но я могне получить WindowedBy напрямую).Но я не могу сортировать сообщения в 10-секундном окне, основываясь на UpdateTime дальше.Мой исходный код:

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("UnsortedMessages");
        TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
                 .until(10000L));
        /*
        SORTING CODE
            */
        outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

Большое спасибо заранее.

1 Ответ

0 голосов
/ 14 июня 2018

Если вы хотите сортировать сообщения, игнорируя ключ, имеет смысл делать это только на основе разделов, а также только в том случае, если входная тема имеет такое же количество разделов, что и выходная тема.В этом случае вы должны извлечь номер раздела и использовать его в качестве ключа сообщения (см .: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information)

Для сортировки это более сложно. Обратите внимание, что Kafka Streams следует модели «непрерывного вывода» и выдаетобновления для каждой входной записи с использованием DSL. Таким образом, может быть лучше использовать Processor API. Вы должны использовать Processor с подключенным хранилищем и помещать записи в хранилище. В качестве структуры в памяти вы храните отсортированный списокВ то время как время идет, вы можете испустить «готовые» окна и удалить соответствующие записи из хранилища.

Я не думаю, что вы можете построить это с помощью DSL.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...