Окно данных ежечасно (по часовой стрелке) в Apache Beam - PullRequest
0 голосов
/ 06 июня 2018

Я пытаюсь агрегировать потоковые данные за каждый час (например, с 12:00 до 12:59 и с 01:00 до 01:59) в задании DataFlow / Apache Beam.

Ниже приведен мой вариант использования

Данные передаются из pubsub, имеют метку времени (дата заказа).Я хочу не считать ни одного заказа в каждый час, который я получаю, также я хочу разрешить задержку в 5 часов.Ниже приведен пример кода, который я использую

    LOG.info("Start Running Pipeline");
    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    PCollection<String>  directShipmentFeedData = pipeline.apply("Get Direct Shipment Feed Data", PubsubIO.readStrings().fromSubscription(directShipmentFeedSubscription));
    PCollection<String>  tibcoRetailOrderConfirmationFeedData = pipeline.apply("Get Tibco Retail Order Confirmation Feed Data", PubsubIO.readStrings().fromSubscription(tibcoRetailOrderConfirmationFeedSubscription));

    PCollection<String> flattenData = PCollectionList.of(directShipmentFeedData).and(tibcoRetailOrderConfirmationFeedData)
            .apply("Flatten Data from PubSub", Flatten.<String>pCollections());

    flattenData
        .apply(ParDo.of(new DataParse())).setCoder(SerializableCoder.of(SalesAndUnits.class))

        // Adding Window

        .apply(
                Window.<SalesAndUnits>into(
                            SlidingWindows.of(Duration.standardMinutes(15))
                            .every(Duration.standardMinutes(1)))
                            )

        // Data Enrich with Dimensions
        .apply(ParDo.of(new DataEnrichWithDimentions()))

        // Group And Hourly Sum
        .apply(new GroupAndSumSales())

        .apply(ParDo.of(new SQLWrite())).setCoder(SerializableCoder.of(SalesAndUnits.class));
    pipeline.run();
    LOG.info("Finish Running Pipeline");

1 Ответ

0 голосов
/ 06 июня 2018

Я бы использовал окно с вашими требованиями.Что-то вроде

Window.into(
  FixedWindows.of(Duration.standardHours(1))
).withAllowedLateness(Duration.standardHours(5)))

Возможно, за которым следует count, поскольку, как я понял, вам нужно.

Надеюсь, это поможет

...