Мне нужна помощь, пожалуйста. Я пытаюсь использовать Apache Beam с источником RabbitMqIO (версия 2.11.0) и триггером AfterWatermark.pastEndOfWindow. Похоже, что водяной знак RabbitMqIO не продвигается и остается прежним. Из-за этого поведения триггер AfterWatermark не работает. Когда я использую другие триггеры, которые не учитывают водяные знаки, это работает (например: AfterProcessingTime, AfterPane) Ниже мой код, спасибо:
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
// Window declaration with trigger
public static Window<RabbitMqMessage> window() {
return Window. <RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes();
}
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
// pipeline creation
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
// Using RabbitMqIO
PCollection<RabbitMqMessage> messages = pipeline
.apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:5672").withQueue("test"));
PCollection<RabbitMqMessage> windowedData = messages.apply("Windowing", window());
windowedData.apply(Combine.globally(new MyCombine()).withoutDefaults());
pipeline.run();
}
}
class MyCombine implements SerializableFunction<Iterable<RabbitMqMessage>, RabbitMqMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCombineKafka.class);
/**
*
*/
private static final long serialVersionUID = 6143898367853230506L;
@Override
public RabbitMqMessage apply(Iterable<RabbitMqMessage> input) {
LOGGER.info("After trigger launched");
return null;
}
}