Окно, основанное на времени события, не срабатывает - PullRequest
1 голос
/ 21 июня 2019

Я работаю над окном, основанным на времени события Flink. Но когда я посылаю сообщение Кафки, программа не выполняет оконную операцию. Я сделал все, что говорят документы, но не смог решить проблему, любая помощь будет оценена, заранее спасибо

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.getConfig();
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        environment.setParallelism(1);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id","event-group");

        FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
        DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);

        KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarksImpl()).
           keyBy(new KeySelector<EventSalesQuantity, String>() {
               @Override
               public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
                   return  eventSalesQuantity.getDealer();
               }
           });

        DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.timeWindow(Time.seconds(5)).aggregate(new AggregateImpl());
        eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
        eventSinkStream.print();
        environment.execute();
    }
}




public class AssignerWithPeriodicWatermarksImpl implements AssignerWithPeriodicWatermarks<EventSalesQuantity> {
    private final long maxOutOfOrderness = 3500; 
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(EventSalesQuantity element, long previousElementTimestamp) {
        long timestamp = DateUtils.getDateFromString(element.getTransactionDate()).getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

"2019-06-21T09: 43: 01" "2019-06-21T09: 43: 03"

Я отправляю 2 сообщения с этими отметками времени, но я не получил вывод.

1 Ответ

2 голосов
/ 21 июня 2019
  1. Ваши временные окна для события имеют длину 5 секунд. Окно, содержащее эти события, не будет запущено до тех пор, пока оно не увидит водяной знак с отметкой времени не менее 2019-06-21T09: 43: 05.
  2. Если для параметра maxOutOfOrderness установлено значение 3500 мсек, ваш генератор водяных знаков не будет генерировать водяной знак, достаточно большой для запуска окна, пока он не увидит событие с отметкой времени не менее 2019-06-21T09: 43: 08.500.
...