Apache Beam: водяной знак RabbitMqIO не продвигается - PullRequest
0 голосов
/ 18 апреля 2019

Мне нужна помощь, пожалуйста. Я пытаюсь использовать Apache Beam с источником RabbitMqIO (версия 2.11.0) и триггером AfterWatermark.pastEndOfWindow. Похоже, что водяной знак RabbitMqIO не продвигается и остается прежним. Из-за этого поведения триггер AfterWatermark не работает. Когда я использую другие триггеры, которые не учитывают водяные знаки, это работает (например: AfterProcessingTime, AfterPane) Ниже мой код, спасибо:

public class Main {

private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

// Window declaration with trigger
public static Window<RabbitMqMessage> window() {
    return Window. <RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes();
}

public static void main(String[] args) {
    SpringApplication.run(Main.class, args);

    // pipeline creation
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    // Using RabbitMqIO
    PCollection<RabbitMqMessage> messages = pipeline
            .apply(RabbitMqIO.read().withUri("amqp://guest:guest@localhost:5672").withQueue("test"));

    PCollection<RabbitMqMessage> windowedData = messages.apply("Windowing", window());

    windowedData.apply(Combine.globally(new MyCombine()).withoutDefaults());

    pipeline.run();
    }

}

class MyCombine implements SerializableFunction<Iterable<RabbitMqMessage>,   RabbitMqMessage> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyCombineKafka.class);

/**
 * 
 */
private static final long serialVersionUID = 6143898367853230506L;

@Override
public RabbitMqMessage apply(Iterable<RabbitMqMessage> input) {
    LOGGER.info("After trigger launched");
    return null;
}

}
...