Dynami c Создание Flink Window путем чтения деталей из Кафки - PullRequest
1 голос
/ 20 января 2020

Скажем,

Сообщение Kafka содержит конфигурацию размера окна flink.

Я хочу прочитать сообщение от kafka и создать глобальное окно в flink.

Постановка задачи:

Можем ли мы справиться с вышеуказанным сценарием с помощью BroadcastStream?

Или

Любой другой подход, который будет поддерживать вышеприведенный случай?

1 Ответ

1 голос
/ 20 января 2020

API окон Flink не поддерживает динамическое изменение размеров окон.

Что вам нужно сделать, это реализовать собственное управление окнами с помощью функции процесса. В этом случае KeyedBroadcastProcessFunction, где конфигурация окна передается.

Вы можете проверить https://github.com/alpinegizmo/timing-explorer/blob/master/src/main/java/com/ververica/functions/PseudoWindow.java для примера того, как реализовать время windows с помощью KeyedProcessFunction (скопировано ниже):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

...