Раздвижные окна для медленных данных (большие интервалы) на Apache Beam - PullRequest
0 голосов
/ 29 мая 2018

Я работаю с набором данных Chicago Traffic Tracker , где новые данные публикуются каждые 15 минут.Когда доступны новые данные, они представляют записи, отключенные на 10-15 минут от «реального времени» ( пример , ищите _last_updt).

Например, в 00:20 я получаю данные с отметкой времени 00:10;в 00:35 я получаю с 00:20;в 00:50 получаю с 00:40.Таким образом, интервал, в течение которого я могу получать новые данные, «фиксирован» (каждые 15 минут), хотя интервал временных меток изменяется незначительно.

Я пытаюсь использовать эти данные в потоке данных (Apache Beam) и для этого яиграть с раздвижными окнами.Моя идея состоит в том, чтобы собрать и обработать 4 последовательных точки данных (4 x 15 минут = 60 минут) и в идеале обновить мой расчет суммы / средних, как только появится новая точка данных.Для этого я начал с кода:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.ZERO)
        .accumulatingFiredPanes());

К сожалению, похоже, что когда я получаю новую точку данных из своего ввода, я не получаю новый (обновленный) результат от GroupByKey, которыйУ меня после.

Что-то не так с моими раздвижными окнами?Или я что-то упускаю?

Ответы [ 2 ]

0 голосов
/ 11 июня 2018

Итак, @Pablo (из моего понимания) дал правильный ответ.Но у меня были некоторые предложения, которые не помещались бы в комментарии.

Я хотел спросить, нужны ли вам раздвижные окна?Из того, что я могу сказать, фиксированные окна сделают эту работу за вас, а также будут проще в вычислительном отношении.Так как вы используете накапливающиеся панели, вам не нужно использовать скользящее окно, так как ваша следующая функция DoFn уже будет получать среднее значение из накопленных панелей.

Что касается кода, я внес изменения в логику раннего и позднего запуска.Я также предлагаю увеличить размер окна.Поскольку вы знаете, что данные поступают каждые 15 минут, вы должны закрывать окно через 15 минут, а не через 15 минут.Но вы также не хотите выбирать окно, которое в конечном итоге столкнется с кратными 15 (например, 20), потому что через 60 минут у вас будет та же проблема.Поэтому выберите число, равное 15-ти, например 19. Также допускайте поздние записи.

    PCollection<TrafficData> trafficData = input        
        .apply("MapIntoFixedWindows", Window.<TrafficData>into(
            FixedWindows.of(Duration.standardMinutes(19)) 
                        .triggering(AfterWatermark.pastEndOfWindow()
                            // fire the moment you see an element 
                            .withEarlyFirings(AfterPane.elementCountAtLeast(1))
                            //this line is optional since you already have a past end of window and a early firing. But just in case 
                            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
                        .withAllowedLateness(Duration.standardMinutes(60))
                        .accumulatingFiredPanes());

Дайте мне знать, если это решит вашу проблему!

РЕДАКТИРОВАТЬ

Итак, я не мог понять, как вы вычислили приведенный выше пример,поэтому я использую общий пример.Ниже приведена общая функция усреднения:

public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
  public static class Accum {
    int sum = 0;
    int count = 0;
  }

  @Override
  public Accum createAccumulator() { return new Accum(); }

  @Override
  public Accum addInput(Accum accum, Integer input) {
      accum.sum += input;
      accum.count++;
      return accum;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accums) {
    Accum merged = createAccumulator();
    for (Accum accum : accums) {
      merged.sum += accum.sum;
      merged.count += accum.count;
    }
    return merged;
  }

  @Override
  public Double extractOutput(Accum accum) {
    return ((double) accum.sum) / accum.count;
  }
}

Чтобы запустить ее, вы должны добавить строку:

PCollection<Double> average = trafficData.apply(Combine.globally(new AverageFn()));

Поскольку вы в настоящее время с использованием накапливающих триггеров запуска, это будет самый простой способ кодирования решения проблемы.

ОДНАКО, если вы хотите использовать окно сбрасываемой панели огня, вам нужно будет использовать PCollectionView для храненияпредыдущее среднее и передать его в качестве бокового ввода для следующего, чтобы отслеживать значения.Это немного сложнее в кодировании, но определенно улучшит производительность, так как постоянная работа выполняется в каждом окне, в отличие от накопительного запуска.

Достаточно ли этого для вас, чтобы сгенерировать свою собственную функцию для отбрасывания окна пожарной панели?

0 голосов
/ 08 июня 2018

Одной из проблем может быть то, что водяной знак проходит мимо конца окна и отбрасывает все последующие элементы.Вы можете попробовать дать несколько минут после прохождения водяного знака:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.standardMinutes(15))
        .accumulatingFiredPanes());

Дайте мне знать, если это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...