Использовать сообщения за последние 5 минут и циклически сбрасывать их в другую тему - PullRequest
1 голос
/ 03 мая 2019

Вот что я пытаюсь сделать:

Я пытаюсь использовать сообщения в 5-минутном временном окне и подсчитывать количество полученных сообщений.После этого я хочу скопировать последнее сообщение, поступившее в группу, скопировать его, отредактировать его поле «count», чтобы оно соответствовало количеству сообщений в временном интервале, и отправить его в тему.

Hoever,У меня проблемы с оконной частью, мне нужна помощь в этом.Я сделал свой собственный JSON Serdes (MessageSerde), и Class Class представляет сообщение JSON, но я не уверен, что я должен сейчас делать в отношении процесса создания окон.

final StreamsBuilder builder = new StreamsBuilder();

    final KStream<String, Message> inputStream = 
    builder

    .stream("users", Consumed.with(Serdes.String(), MessageSerde));

    inputStream
    .filter(new Predicate<String, Message>() {
    @Override
    public boolean test(String s, Message s2) {
        return s2.getPriority().equals("Low");
        }
    })


    KTable<Windowed<String>, Message> fiveMinuteWindowed = inputStream

    .groupByKey(Serialized.with(Serdes.String(), MessageSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .count()
    .to("test-filter", Produced.with(Serdes.String(), MessageSerde));


    final KafkaStreams streams = new KafkaStreams(builder.build(), props);

Обновление

После предложения Матиаса я создал этот код, но столкнулся с несколькими ошибками компиляции, в методе .to:

   inputStream

     .filter(new Predicate<String, Message>() {
    @Override
    public boolean test(String s, Message s2) {
        return s2.getPriority().equals("Low");
      }
    })
    .groupByKey(Serialized.with(Serdes.String(), MessageSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .aggregate(
    new Initializer<Message>() { /* initializer */
        @Override
        public Message apply() {
            Message tuple = new Message();
            return tuple;
        }
    },
    new Aggregator<String, Message, Message>() { /* adder */
        @Override
        public Message apply(String aggKey, Message curMsg, Message tuple) {

            tuple.setCount(tuple.getCount() + 1);

            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");

            try{
                Date parsedDate_cur = dateFormat.parse(curMsg.getTimestamp());
                Timestamp timestamp_cur = new java.sql.Timestamp(parsedDate_cur.getTime());
                Date parsedDate_agg = dateFormat.parse(tuple.getTimestamp());
                Timestamp timestamp_agg = new java.sql.Timestamp(parsedDate_agg.getTime());

                if(timestamp_cur.after(timestamp_agg)){
                    tuple.setTimestamp(timestamp_cur.toString());
                }

                tuple.setId(curMsg.getId());
                tuple.setSourceip(curMsg.getSourceip());
                tuple.setType(curMsg.getType());
                tuple.setPriority(curMsg.getPriority());
                tuple.setName(curMsg.getName());
                tuple.setMetadata(curMsg.getMetadata());  
            }
            catch(Exception e){
                System.out.println("Error parsing dates"); 
            }    
            return tuple;
        }
    },
    Materialized.with(Serdes.String(), MessageSerde))
    .suppress(untilWindowCloses(unbounded())) 
    .toStream();
    .to("test-filter", Produced.with(Serdes.String(), MessageSerde));

К ошибкам относятся:

Compilation failure
[ERROR] /home/x/Documents/project/src/main/java/com/streams/app/App.java:[194,9] no suitable method found for to(java.lang.String,org.apache.kafka.streams.kstream.Produced<java.lang.String,Pojo.Message>)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; inference variable K has incompatible equality constraints org.apache.kafka.streams.kstream.Windowed<java.lang.String>,java.lang.String)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; java.lang.String cannot be converted to org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...