Я пытаюсь создать приложение Kafka Streams, в котором я пытаюсь вычислить уникальные устройства для каждой платформы в пределах временного окна.
Класс событий
public class Event {
private String eventId;
private String deviceId;
private String platform;
private ZonedDateTime createdAt;
}
Мне нужно, чтобы временное окно соответствовало созданному событию. Поэтому я написал реализацию TimestampExtractor
, как показано ниже:
public class EventTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
final Event event = (Event) record.value();
final ZonedDateTime eventCreationTime = event.getCreatedAt();
final long timestamp = eventCreationTime.toEpochSecond();
log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);
return timestamp;
}
}
Наконец, вот мой код потокового приложения:
final KStream<String, Event> eventStream = builder.stream("events_ingestion");
eventStream
.selectKey((key, event) -> {
final String platform = event.getPlatform();
final String deviceId = event.getDeviceId());
return String.join("::", platform, deviceId);
})
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
.count(Materialized.as(COUNT_STORE));
Когда яперенесите событие в тему event_ingestion
, я вижу, что отметка времени заносится в журналы приложений, а данные записываются в хранилище счетчиков.
Когда я перебираю хранилище счетчиков, я вижу следующее:
Key: [ANDROID::1@1539000000/1539900000], Value: 2
Хотя мое временное окно составляет 15 минут, ключ охватывает 10 дней.Если я удаляю свою реализацию TimestampExtractor из конфигурации потока (следовательно, возвращаюсь к времени обработки), ключ занимает 15 минут, как и ожидалось:
Key: [ANDROID::1@1539256500000/1539257400000], Value: 1
Что я здесь не так делаю?Есть идеи?