Как запустить нисходящий метод onEventTime () при использовании шаблона BroadcastState? - PullRequest
1 голос
/ 27 февраля 2020

Я использую конвейер следующим образом:

inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
     processBroadcastElement(...){...}
     processElement(...){...}
     }).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())

В CustomTrigger() я регистрирую eventTimeTimer(), который сработает, чтобы указать конец моего окна. Проблема в том, что метод onEventTime() не вызывается никогда , даже когда:

  • Я обеспечил env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • Используя ascendingTimestampExtractor(), я отправили событие, которое определенно отодвинул водяной знак достаточно далеко, чтобы eventTimeTimer() запустился.

Чего мне не хватает? Имеет ли это какое-то отношение к отсутствующим водяным знакам и методу onTimer() для KeyedBroadcastProcessFunction? Я подозреваю, что из-за комментария Дэвида Андерсона в этом ответе:

добавить специальные поддельные водяные знаки для не вещательного потока (установите значение Watermark.MAX_WATERMARK)

и тот факт, что я не реализовал метод, названный в Timer . Однако, если это действительно так, я не понимаю, как это будет иметь отношение к нисходящему триггеру. Спасибо.

Редактировать: полный пример этого сценария здесь .

1 Ответ

1 голос
/ 27 февраля 2020

Да, проблема в том, что в трансляции нет водяных знаков. (Но нет, не имеет значения, есть ли у KeyedBroadcastProcessFunction метод onTimer или нет. Как только вы получите поток водяных знаков, они будут перетекать в окно независимо от того.)

Всякий раз, когда у оператора есть два или более входы - так в вашем случае, когда подключены inputStream и configurationBroadcastStream - водяной знак у этого оператора будет минимальным из водяных знаков от его входов. Поскольку широковещательный поток не имеет водяных знаков, он удерживает водяные знаки, предоставленные inputStream.

. У меня есть пример , показывающий, как вы можете справиться с этим. Предполагая, что ваш широковещательный поток не должен иметь какой-либо информации о времени, вы можете реализовать средство извлечения меток времени и присваивания водяных знаков, которое эффективно уступает контроль водяных знаков другому потоку. Примерно так:

// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.

public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return Watermark.MAX_WATERMARK;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        return 0;
    }
}
...