• 1000 steam, как показано ниже.
DataStream<Alert> result = filteredMetrics
.keyBy(
new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
return Tuple3.of(in.f0, in.f1, in.f2);
}
})
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ThresholdEvaluator());
result.addSink(new AlertSink());
Когда я проверил журналы, я увидел, что приемник был вызван, но показывает пустую строку. ThresholdEvaluator выдает предупреждение, но показывает непустую строку.
2020-08-05 19:38:16,638 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}
Я что-то упустил?
Я также попытался добавить функцию карты между операторами ThresholdEvaluator и addSink. MapFunction, похоже, может получать объект Alert нормально, но не AlertSink.
result.map(new MapFunction<Alert, Alert>() {
@Override
public Alert map(Alert value) {
LOG.info(value.toString());
return value;
}
}).addSink(new AlertSink());
(обновлено с дополнительным журналом)