Сначала Вам нужно реализовать CustomTimestampPolicy, расширив TimestampPolicy<KeyT,ValueT>
Например:
public class CustomFieldTimePolicy extends TimestampPolicy<String, Foo> {
protected Instant currentWatermark;
public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Foo> record) {
currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
Затем вам нужно передать свой пользовательский TimestampPolicy, когдаВы настраиваете свой источник KafkaIO, используя функциональный интерфейс TimestampPolicyFactory
KafkaIO.<String, Foo>read().withBootstrapServers("http://localhost:9092")
.withTopic("foo")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Foo.class)) //if you use avro
.withTimestampPolicyFactory((tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))
.updateConsumerProperties(kafkaProperties))
Эта строка отвечает за создание новой метки времениPolicy, прохождения связанного раздела и предыдущего контрольного пункта с водяным знаком, см. документацию
withTimestampPolicyFactory(tp, previousWatermark) -> new CustomFieldTimePolicy(previousWatermark))