Я пытаюсь агрегировать потоковые данные за каждый час (например, с 12:00 до 12:59 и с 01:00 до 01:59) в задании DataFlow / Apache Beam.
Ниже приведен мой вариант использования
Данные передаются из pubsub, имеют метку времени (дата заказа).Я хочу не считать ни одного заказа в каждый час, который я получаю, также я хочу разрешить задержку в 5 часов.Ниже приведен пример кода, который я использую
LOG.info("Start Running Pipeline");
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> directShipmentFeedData = pipeline.apply("Get Direct Shipment Feed Data", PubsubIO.readStrings().fromSubscription(directShipmentFeedSubscription));
PCollection<String> tibcoRetailOrderConfirmationFeedData = pipeline.apply("Get Tibco Retail Order Confirmation Feed Data", PubsubIO.readStrings().fromSubscription(tibcoRetailOrderConfirmationFeedSubscription));
PCollection<String> flattenData = PCollectionList.of(directShipmentFeedData).and(tibcoRetailOrderConfirmationFeedData)
.apply("Flatten Data from PubSub", Flatten.<String>pCollections());
flattenData
.apply(ParDo.of(new DataParse())).setCoder(SerializableCoder.of(SalesAndUnits.class))
// Adding Window
.apply(
Window.<SalesAndUnits>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardMinutes(1)))
)
// Data Enrich with Dimensions
.apply(ParDo.of(new DataEnrichWithDimentions()))
// Group And Hourly Sum
.apply(new GroupAndSumSales())
.apply(ParDo.of(new SQLWrite())).setCoder(SerializableCoder.of(SalesAndUnits.class));
pipeline.run();
LOG.info("Finish Running Pipeline");