Обновление задания потоковой передачи данных с помощью окна сеанса и сайдинга, встроенного в DF - PullRequest
0 голосов
/ 26 мая 2020

В моем случае использования я выполняю сеанс, а также скользящее окно внутри задания потока данных. Итак, в основном мое время скользящего окна составляет 10 часов, а время скольжения - 4 минуты. Поскольку я применяю группировку и выполняю функцию max поверх этого, через каждые 3 мин. Окно будет запускать панель, и она будет go в окне сеанса с запуском logi c на нем. Ниже приведен код того же.

Window<Map<String, String>> windowMap = Window.<Map<String, String>>into(
                SlidingWindows.of(Duration.standardHours(10)).every(Duration.standardMinutes(4)));

        Window<Map<String, String>> windowSession = Window
                .<Map<String, String>>into(Sessions.withGapDuration(Duration.standardHours(10))).discardingFiredPanes()
                .triggering(Repeatedly
                        .forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                .withAllowedLateness(Duration.standardSeconds(10));

Я хотел бы добавить регистратор на некоторых этапах отладки, поэтому я пытаюсь обновить текущее задание потоковой передачи, используя приведенный ниже код:

options.setRegion("asia-east1");
options.setUpdate(true);
options.setStreaming(true);

Итак, раньше у меня было около 10 тыс. Данных, и я обновил существующий конвейер, используя указанную выше конфигурацию, и теперь я не могу видеть столько данных на этапах обновленного задания DF. Так что помогите мне понять, сохраняет ли он предыдущие данные задания или нет, так как я не вижу количество предыдущих шагов DF в обновленном задании.

...