Да, проблема в том, что в трансляции нет водяных знаков. (Но нет, не имеет значения, есть ли у KeyedBroadcastProcessFunction
метод onTimer или нет. Как только вы получите поток водяных знаков, они будут перетекать в окно независимо от того.)
Всякий раз, когда у оператора есть два или более входы - так в вашем случае, когда подключены inputStream
и configurationBroadcastStream
- водяной знак у этого оператора будет минимальным из водяных знаков от его входов. Поскольку широковещательный поток не имеет водяных знаков, он удерживает водяные знаки, предоставленные inputStream
.
. У меня есть пример , показывающий, как вы можете справиться с этим. Предполагая, что ваш широковещательный поток не должен иметь какой-либо информации о времени, вы можете реализовать средство извлечения меток времени и присваивания водяных знаков, которое эффективно уступает контроль водяных знаков другому потоку. Примерно так:
// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return Watermark.MAX_WATERMARK;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
return 0;
}
}