Мониторинг в реальном времени с использованием Apache Beam - PullRequest
0 голосов
/ 03 февраля 2019

Я бы хотел выполнить следующее с помощью Apache Beam:

вычислять каждые 5 секунд события, которые читаются из pubsub в последнюю минуту

Цельдолжен иметь представление в реальном времени о скорости поступления данных. Затем это может быть расширено до более сложных вариантов использования.

После поиска я не нашел способа решить эту, казалось бы, простую проблему,Вещи, которые не работают:

  • глобальное окно + повторные триггеры (триггеры не срабатывают, когда нет ввода)
  • скользящее окно + без значений по умолчанию (не позволяет пустым окнам появлятьсяпо-видимому)

Есть предложения по решению этой проблемы?

1 Ответ

0 голосов
/ 06 февраля 2019

Как уже обсуждалось, Beam не генерирует данные для пустых окон.В дополнение к причинам, приведенным Руи Вангом, мы можем добавить проблему того, как последние этапы будут обрабатывать эти пустые панели.

В любом случае, описанный вами конкретный вариант использования - мониторинг скользящего числа сообщений - должен быть возможен при некоторой работе, даже если показатель в конечном итоге упадет до нуля.Одной из возможностей было бы опубликовать устойчивое количество фиктивных сообщений, которые бы продвигали водяной знак и запускали панели, но затем отфильтровывались внутри конвейера.Проблема этого подхода заключается в том, что источник публикации необходимо адаптировать, и это не всегда может быть удобным / возможным.Другой будет включать в себя создание этих поддельных данных в качестве другого ввода и совместную их группировку с основным потоком.Преимущество состоит в том, что все может быть сделано в потоке данных без необходимости настройки источника или приемника.Для иллюстрации приведу пример.

Входы разделены на два потока.Для фиктивного я использовал GenerateSequence для создания нового элемента каждые 5 секунд.Затем я создаю окно PCollection (стратегия управления окнами должна быть совместима с таковой для основного потока, поэтому я буду использовать ее).Затем я сопоставляю элемент с парой ключ-значение, где значение равно 0 (мы могли бы использовать другие значения, поскольку мы знаем, из какого потока поступает элемент, но я хочу доказать, что фиктивные записи не учитываются).

PCollection<KV<String,Integer>> dummyStream = p
    .apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
    .apply("Window Messages - Dummy", Window.<Long>into(
            ...
    .apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            c.output(KV.of("num_messages", 0));
        }
    }));

Для основного потока, который читает из Pub / Sub, я сопоставляю каждую запись со значением 1. Позже я добавлю все те же, что и в типичных примерах подсчета слов, используя этапы сокращения карты.

PCollection<KV<String,Integer>> mainStream = p
    .apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
    .apply("Window Messages - Data", Window.<String>into(
            ...
    .apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            c.output(KV.of("num_messages", 1));
        }
    }));

Затем нам нужно присоединиться к ним, используя CoGroupByKey (я использовал тот же ключ num_messages для группового подсчета).Этот этап будет выводить результаты, когда один из двух входов имеет элементы, поэтому здесь разблокируется основная проблема (пустые окна без сообщений Pub / Sub).

final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();

PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
        .and(dataTag, mainStream).apply(CoGroupByKey.<String>create());

Наконец, мы добавляем все, чтобы получитьобщее количество сообщений для окна.Если из dataTag нет элементов, то по умолчанию сумма будет равна 0.

public void processElement(ProcessContext c, BoundedWindow window) {
    Integer total_sum = new Integer(0);

    Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
    for (Integer val : dataTagVal) {
        total_sum += val;
    }

    LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}

Это должно привести к чему-то вроде: enter image description here

Обратите внимание, что результаты из разных окон могут быть неупорядоченными (это может произойти в любом случае при записи в BigQuery), и я не играл с настройками окна для оптимизации примера.

Полный код:

public class EmptyWindows {

    private static final Logger LOG = LoggerFactory.getLogger(EmptyWindows.class);

    public static interface MyOptions extends PipelineOptions {
        @Description("Input topic")
        String getInput();

        void setInput(String s);
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);

        String topic = options.getInput();

        PCollection<KV<String,Integer>> mainStream = p
            .apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
            .apply("Window Messages - Data", Window.<String>into(
                    SlidingWindows.of(Duration.standardMinutes(1))
                            .every(Duration.standardSeconds(5)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO)
                    .accumulatingFiredPanes())
            .apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    //LOG.info("New data element in main output");
                    c.output(KV.of("num_messages", 1));
                }
            }));

        PCollection<KV<String,Integer>> dummyStream = p
            .apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
            .apply("Window Messages - Dummy", Window.<Long>into(
                    SlidingWindows.of(Duration.standardMinutes(1))
                            .every(Duration.standardSeconds(5)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO)
                    .accumulatingFiredPanes())
            .apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    //LOG.info("New dummy element in main output");
                    c.output(KV.of("num_messages", 0));
                }
            }));

        final TupleTag<Integer> dummyTag = new TupleTag<>();
        final TupleTag<Integer> dataTag = new TupleTag<>();

        PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
                .and(dataTag, mainStream).apply(CoGroupByKey.<String>create());

        coGbkResultCollection
                .apply("Log results", ParDo.of(new DoFn<KV<String, CoGbkResult>, Void>() {

                    @ProcessElement
                    public void processElement(ProcessContext c, BoundedWindow window) {
                        Integer total_sum = new Integer(0);

                        Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
                        for (Integer val : dataTagVal) {
                            total_sum += val;
                        }

                        LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
                    }
                }));

        p.run();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...