Что именно используется в withIngestionTimestamps () в Hazelcast Jet Pipeline? - PullRequest
0 голосов
/ 08 мая 2019

Я запускаю конвейер, источник из темы Kafka и опускается до IMap.Каждый раз, когда я пишу один, я сталкиваюсь с методами withIngestionTimestamps() и withoutTimestamps() и удивляюсь, как они полезны?Я все понимаю, что источник добавляет время к событию.Вопрос в том, как мне его использовать?Я не вижу какого-либо метода для извлечения метки времени из события?

Мой IMap может заполняться дублирующимися значениями.Если бы я мог использовать метод withIngestionTimestamps () для оценки последней записи и удаления старой?

Ответы [ 2 ]

1 голос
/ 09 мая 2019

Jet управляет временной меткой события за кулисами, она видна только процессорам.Например, при агрегировании окна будет использоваться временная метка.

Если вы хотите увидеть временную метку в коде, вам необходимо включить ее в свой тип элемента.Вы должны идти без отметок времени от источника, добавить отметку времени приема с помощью оператора map и сообщить Jet об этом:

Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
 .withoutTimestamps()
 .map(t -> tuple2(System.currentTimeMillis(), t))
 .addTimestamps(Tuple2::f0, 2000)
 .drainTo(Sinks.logger());

Я использовал allowedLag из 2000 мс.Причина этого заключается в том, что временные метки будут добавлены в вершине ниже по течению от вершины, которая их присвоила.Там может происходить слияние потоков, и необходимо учитывать внутренний перекос.Например, он должен учитывать наибольшую ожидаемую паузу GC или сетевую задержку.См. Примечание в методе addTimestamps.

1 голос
/ 08 мая 2019

Jet использует метки времени события, чтобы правильно применить управление окнами.Он должен решить, какое событие принадлежит какому окну и когда пришло время закрыть окно и выдать его агрегированный результат.Временные метки присутствуют в событиях в виде метаданных и не предоставляются пользователю.

Однако, если вы хотите применить свою логику, которая относится ко времени настенных часов, вы всегда можете вызвать System.currentTimeMillis(), чтобы сравнить его с отметкой времени, явно сохраненной в значении IMap.Это было бы эквивалентно использованию времени обработки , которое очень похоже на время приема , которое применяется Jet.Время проглатывания - это просто время обработки, действительное в исходной вершине конвейера, поэтому применение времени обработки в вершине приемника немного отличается от этого и имеет те же практические свойства.

...