Jet управляет временной меткой события за кулисами, она видна только процессорам.Например, при агрегировании окна будет использоваться временная метка.
Если вы хотите увидеть временную метку в коде, вам необходимо включить ее в свой тип элемента.Вы должны идти без отметок времени от источника, добавить отметку времени приема с помощью оператора map
и сообщить Jet об этом:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
.withoutTimestamps()
.map(t -> tuple2(System.currentTimeMillis(), t))
.addTimestamps(Tuple2::f0, 2000)
.drainTo(Sinks.logger());
Я использовал allowedLag
из 2000 мс.Причина этого заключается в том, что временные метки будут добавлены в вершине ниже по течению от вершины, которая их присвоила.Там может происходить слияние потоков, и необходимо учитывать внутренний перекос.Например, он должен учитывать наибольшую ожидаемую паузу GC или сетевую задержку.См. Примечание в методе addTimestamps
.