Apache Балл GroupByKey дублировать события - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть apache конвейер луча в java, который выглядит следующим образом:

        pipeline
        .apply("Read pubsub",PubsubIO.readStrings()
                .fromTopic(inputTopic)
        )
        .apply(window)
        .apply(ParDo.of(new OrderCodeKey()))
        .apply(GroupByKey.<String, RawBsonDocument>create())
        .apply(ParDo.of(new GetLastOrder()))
        .apply(ParDo.of(new ShopIDKey()))
        .apply(GroupByKey.create())
        .apply(ParDo.of(new CountShopOrder()))
        ;

Описание проблемы: Мне нужно для потоковой передачи событий заказов из pubsub topi c, каждое событие имеет тип «обновление» или «вставка» и соответствует операции обновления / вставки в mongodb для порядка . В каждом событии есть несколько заметных полей:

  • clusterTime -> время, когда происходит операция
  • код заказа -> код заказа
  • shop_id -> магазин которому принадлежит заказ.

Моя цель - вычислить общее количество заказов в магазине.

Деталь конвейера:

  • Чтение из pubsub topi c
  • Создание пары ключ-значение (для последующего GroupByKey) ключ - это код заказа, а значение - документ
  public static class OrderCodeKey extends DoFn<String, KV<String, RawBsonDocument>>{
       @ProcessElement
       public void processElement (@Element String order, OutputReceiver<KV<String, RawBsonDocument>> out) throws ParseException {
           // Parse JSON
           RawBsonDocument document = RawBsonDocument.parse(order);
           BsonDocument fullDocument = document.getDocument("fullDocument");
           String orderCode = fullDocument.getString("order_code").getValue();
           //Integer orderCode = 0;
           out.output(KV.of(orderCode, document));
       }
   }
  • GroupByKey ( сгруппировать по коду заказа)
  • Объединить события с одним и тем же кодом заказа, чтобы получить последний заказ (тот, у которого самый большой clusterTime)
public static class GetLastOrder extends DoFn<KV<String, Iterable<RawBsonDocument>>, KV<String, BsonDocument>>{
        @ProcessElement
        public void processElement (@Element KV<String, Iterable<RawBsonDocument>> orders, OutputReceiver<KV<String, BsonDocument>> out) throws ParseException, java.text.ParseException {
            List<RawBsonDocument> docs = Lists.newArrayList(orders.getValue());
            Collections.sort(docs, new Comparator<RawBsonDocument>(){
                @Override
                public int compare(RawBsonDocument o1, RawBsonDocument o2) {
                    BsonTimestamp t1 = o1.get("clusterTime").asTimestamp();
                    BsonTimestamp t2 = o2.get("clusterTime").asTimestamp();
                    return t2.compareTo(t1); // sort desc
                }
            });
            RawBsonDocument document = docs.iterator().next();
            out.output(KV.of(orders.getKey(), document.getDocument("fullDocument")));
        }
  • Создать новый ключ (ключ = shop_id ) для последующего GroupByKey
public static class ShopIDKey extends DoFn<KV<String, BsonDocument>, KV<Long, BsonDocument>>{
        @ProcessElement
        public void processElement (@Element KV<String, BsonDocument> order, OutputReceiver<KV<Long, BsonDocument>> out) throws ParseException {
        Long shopId =  Long.valueOf(order.getValue().getInt32("shop_id").getValue());
        out.output(KV.of(shopId, order.getValue()));
        }
    }
  • GroupByKey (группировка по shop_id)
  • Подсчет количества заказов в магазине
public static class CountShopOrder extends DoFn<KV<Long, Iterable<BsonDocument>>, Shop>{
        @ProcessElement
        public void processElement (@Element KV<Long, Iterable<BsonDocument>> orders, OutputReceiver<Shop> out) throws ParseException, java.text.ParseException {
            List<BsonDocument> docs = Lists.newArrayList(orders.getValue());
            Long countOrder = Long.valueOf(docs.size());
        }
    }

In последний шаг, Я предполагаю, что входные итерируемые ордера будут содержать только уникальные ордера (потому что после t он GroupByKey-GetLastOrder, сохраняется только последнее событие). Однако при отладке я получил событий с тем же порядковым кодом , который я уже сократил с помощью GetLastOrder ParDo. Вот окно и триггер, который я использую:

Window<String> window = Window.<String>into(
                FixedWindows.of(Duration.standardMinutes(60))
                )
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                )
                .withAllowedLateness(Duration.standardSeconds(0))
                .accumulatingFiredPanes();

Буду признателен за любые комментарии / помощь по этому вопросу. Заранее спасибо!

1 Ответ

0 голосов
/ 30 апреля 2020

Если вам не нужны промежуточные результаты, а требуется просто почасовая сводка уникальных заказов для каждого магазина, ранний запуск не требуется.

Когда вы используете .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()), каждая панель будет обрабатывать только подмножество данных после первые данные поступают на панель. С .accumulatingFiredPanes() вы увидите накопленный результат из предыдущей панели, поэтому одни и те же данные появляются несколько раз на разных панелях для одного и того же окна.

Пожалуйста, смотрите https://beam.apache.org/documentation/programming-guide/#setting -a-trigger для деталей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...