У меня есть apache конвейер луча в java, который выглядит следующим образом:
pipeline
.apply("Read pubsub",PubsubIO.readStrings()
.fromTopic(inputTopic)
)
.apply(window)
.apply(ParDo.of(new OrderCodeKey()))
.apply(GroupByKey.<String, RawBsonDocument>create())
.apply(ParDo.of(new GetLastOrder()))
.apply(ParDo.of(new ShopIDKey()))
.apply(GroupByKey.create())
.apply(ParDo.of(new CountShopOrder()))
;
Описание проблемы: Мне нужно для потоковой передачи событий заказов из pubsub topi c, каждое событие имеет тип «обновление» или «вставка» и соответствует операции обновления / вставки в mongodb для порядка . В каждом событии есть несколько заметных полей:
- clusterTime -> время, когда происходит операция
- код заказа -> код заказа
- shop_id -> магазин которому принадлежит заказ.
Моя цель - вычислить общее количество заказов в магазине.
Деталь конвейера:
- Чтение из pubsub topi c
- Создание пары ключ-значение (для последующего GroupByKey) ключ - это код заказа, а значение - документ
public static class OrderCodeKey extends DoFn<String, KV<String, RawBsonDocument>>{
@ProcessElement
public void processElement (@Element String order, OutputReceiver<KV<String, RawBsonDocument>> out) throws ParseException {
// Parse JSON
RawBsonDocument document = RawBsonDocument.parse(order);
BsonDocument fullDocument = document.getDocument("fullDocument");
String orderCode = fullDocument.getString("order_code").getValue();
//Integer orderCode = 0;
out.output(KV.of(orderCode, document));
}
}
- GroupByKey ( сгруппировать по коду заказа)
- Объединить события с одним и тем же кодом заказа, чтобы получить последний заказ (тот, у которого самый большой clusterTime)
public static class GetLastOrder extends DoFn<KV<String, Iterable<RawBsonDocument>>, KV<String, BsonDocument>>{
@ProcessElement
public void processElement (@Element KV<String, Iterable<RawBsonDocument>> orders, OutputReceiver<KV<String, BsonDocument>> out) throws ParseException, java.text.ParseException {
List<RawBsonDocument> docs = Lists.newArrayList(orders.getValue());
Collections.sort(docs, new Comparator<RawBsonDocument>(){
@Override
public int compare(RawBsonDocument o1, RawBsonDocument o2) {
BsonTimestamp t1 = o1.get("clusterTime").asTimestamp();
BsonTimestamp t2 = o2.get("clusterTime").asTimestamp();
return t2.compareTo(t1); // sort desc
}
});
RawBsonDocument document = docs.iterator().next();
out.output(KV.of(orders.getKey(), document.getDocument("fullDocument")));
}
- Создать новый ключ (ключ = shop_id ) для последующего GroupByKey
public static class ShopIDKey extends DoFn<KV<String, BsonDocument>, KV<Long, BsonDocument>>{
@ProcessElement
public void processElement (@Element KV<String, BsonDocument> order, OutputReceiver<KV<Long, BsonDocument>> out) throws ParseException {
Long shopId = Long.valueOf(order.getValue().getInt32("shop_id").getValue());
out.output(KV.of(shopId, order.getValue()));
}
}
- GroupByKey (группировка по shop_id)
- Подсчет количества заказов в магазине
public static class CountShopOrder extends DoFn<KV<Long, Iterable<BsonDocument>>, Shop>{
@ProcessElement
public void processElement (@Element KV<Long, Iterable<BsonDocument>> orders, OutputReceiver<Shop> out) throws ParseException, java.text.ParseException {
List<BsonDocument> docs = Lists.newArrayList(orders.getValue());
Long countOrder = Long.valueOf(docs.size());
}
}
In последний шаг, Я предполагаю, что входные итерируемые ордера будут содержать только уникальные ордера (потому что после t он GroupByKey-GetLastOrder, сохраняется только последнее событие). Однако при отладке я получил событий с тем же порядковым кодом , который я уже сократил с помощью GetLastOrder ParDo. Вот окно и триггер, который я использую:
Window<String> window = Window.<String>into(
FixedWindows.of(Duration.standardMinutes(60))
)
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
)
.withAllowedLateness(Duration.standardSeconds(0))
.accumulatingFiredPanes();
Буду признателен за любые комментарии / помощь по этому вопросу. Заранее спасибо!