Пользовательский приемник Flink вызван, но без данных - PullRequest
0 голосов
/ 05 августа 2020
• 1000 steam, как показано ниже.
        DataStream<Alert> result = filteredMetrics
            .keyBy(
                new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
                    @Override
                    public  Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
                        return Tuple3.of(in.f0, in.f1, in.f2);
                    }
            })
            .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .process(new ThresholdEvaluator());

        result.addSink(new AlertSink());

Когда я проверил журналы, я увидел, что приемник был вызван, но показывает пустую строку. ThresholdEvaluator выдает предупреждение, но показывает непустую строку.

2020-08-05 19:38:16,638 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO  io.name.package.AlertSink                      - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO  io.name.package.ThresholdEvaluator             - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}

Я что-то упустил?

Я также попытался добавить функцию карты между операторами ThresholdEvaluator и addSink. MapFunction, похоже, может получать объект Alert нормально, но не AlertSink.

        result.map(new MapFunction<Alert, Alert>() {
            @Override
            public Alert map(Alert value) {
                LOG.info(value.toString());
                return value;
            }
        }).addSink(new AlertSink());

(обновлено с дополнительным журналом)

1 Ответ

1 голос
/ 06 августа 2020

Причина в том, что в выходных данных журнала нет заполнителя для интерполяции значения - правильный синтаксис:

LOG.info("Invoking sink for alert: {}", alert.toString());

Разница между этим и

LOG.info("Invoking sink for alert: " + alert.toString());

заключается в том, что в в последнем случае конкатенация строк будет происходить каждый раз независимо от уровня журнала, и в первом случае будет интерполировано значение, если только уровень журнала не менее INFO.

...