Все, что я читаю, говорит, что это должно работать: мне нужно, чтобы мой слушатель запускался каждые 10 секунд с событиями. Теперь я получаю каждое событие, это триггер слушателя. Что мне не хватает? Базовые требования c - создавать сводную статистику каждые 10 секунд. В идеале я просто хочу закачать данные в среду выполнения. Итак, в этом примере я ожидал бы дамп из 10 записей, каждые 10 секунд
class StreamTest {
private final Configuration configuration = new Configuration();
private final EPRuntime runtime;
private final CompilerArguments args = new CompilerArguments();
private final EPCompiler compiler;
public DatadogApplicationTests() {
configuration.getCommon().addEventType(CommonLogEntry.class);
runtime = EPRuntimeProvider.getRuntime(this.getClass().getSimpleName(), configuration);
args.getPath().add(runtime.getRuntimePath());
compiler = EPCompilerProvider.getCompiler();
}
@Test
void testDisplayStatsEvery10S() throws Exception{
// Display stats every 10s about the traffic during those 10s:
EPCompiled compiled = compiler.compile("select * from CommonLogEntry.win:time(10)", args);
runtime.getDeploymentService().deploy(compiled).getStatements()[0].addListener(
(old, newEvents, epStatement, epRuntime) ->
Arrays.stream(old).forEach(e -> System.out.format("%s: received %n", LocalTime.now()))
);
new BufferedReader(new InputStreamReader(this.getClass().getResourceAsStream("/access.log"))).lines().map(CommonLogEntry::new).forEachOrdered(e -> {
runtime.getEventService().sendEventBean(e, e.getClass().getSimpleName());
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException ex) {
System.err.println(ex);
}
});
}
}
который в настоящее время выводит каждую секунду, что соответствует сну в моем потоке:
11:00:54.676: received
11:00:55.684: received
11:00:56.689: received
11:00:57.694: received
11:00:58.698: received
11:00:59.700: received