Тимоти,
Чтобы получить метку времени записи, вы можете использовать операцию transformValues()
. Предоставляемый вами ValuesTransformer
имеет доступ к ProcessorContext
, и вы можете вызвать ProcessorContex.timestamp()
в методе ValueTransformer.transform()
. Если отметка времени находится в требуемом диапазоне, верните запись, в противном случае верните ноль. Затем добавьте filter()
после transformValues()
, чтобы удалить отклоненные записи.
Вот пример, который я думаю будет работать
class GroupByTimestampExample {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
// You need to update the the time fields these are just placeholders
long earliest = Instant.now().toEpochMilli();
long latest = Instant.now().toEpochMilli() + (60 * 60 * 1000);
final ValueTransformerSupplier<String, String> valueTransformerSupplier = new TimeFilteringTransformer(earliest, latest);
final KTable<String, Long> voteTable = builder.<String, String>stream("topic")
.transformValues(valueTransformerSupplier)
.filter((k, v) -> v != null)
.groupByKey()
.count();
}
static final class TimeFilteringTransformer implements ValueTransformerSupplier<String, String> {
private final long earliest;
private final long latest;
public TimeFilteringTransformer(final long earliest, final long latest) {
this.earliest = earliest;
this.latest = latest;
}
@Override
public ValueTransformer<String, String> get() {
return new ValueTransformer<String, String>() {
private ProcessorContext processorContext;
@Override
public void init(ProcessorContext context) {
processorContext = context;
}
@Override
public String transform(String value) {
long ts = processorContext.timestamp();
if (ts >= earliest && ts <= latest) {
return value;
}
return null;
}
@Override
public void close() {
}
};
}
}
}
Позвольте мне знать, как это происходит.