Flink EventTime Обработка Водяной знак всегда приходит как -9223372036854725808 - PullRequest
0 голосов
/ 02 марта 2020

Я пытаюсь использовать функцию процесса для некоторой обработки множества событий. Я использую время события и ключевой поток. Проблема, с которой я сталкиваюсь, заключается в том, что значение водяного знака всегда отображается как 9223372036854725808. Я поставил оператор печати для отладки, и он выглядит так:

отметка времени ------ 1583128014000 extractTimestamp 1583128014000 currentwatermark ----- 9223372036854775808

отметка времени ------ 1583128048000 извлеченная отметка времени 1583128048000 currentwatermark ----- 9223372036854775808

отметка времени ------ 1583128089000 извлеченная отметка времени 1583128089000 currentwatermark ----- 9223372036854775808 * 100 1008 * Так что метка времени и извлеченная метка времени изменяется, но водяной знак не обновляется. Так что никакая запись не попадает в очередь, так как context.timestamp никогда не меньше водяного знака.

DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }

Это код для processFunction ....

public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {

    private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;

    @Override
    public void open(Configuration config) throws Exception {
        System.out.println("open");
        ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
                "sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
        })
        );
        queueState = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
        Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();

        PriorityQueue<GenericRecord> records = tuple.f1;

    }

    @Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}

1 Ответ

0 голосов
/ 02 марта 2020

Вот возможное объяснение того, чем вы поделились:

TimestampsAndPunctuatedWatermarksOperator вызывает extractTimestamp, прежде чем вызывает checkAndGetNextWatermark для данной записи. Это означает, что при первом вызове processElement в вашем MatchFunction в каждой задаче (параллельном экземпляре) текущим водяным знаком будет Long.MIN_VALUE (то есть -9223372036854775808).

Если ваш параллелизм достаточно большой, что может объяснить видение

currentwatermark-----9223372036854775808

несколько раз.

...