Управление потоками Кафки с помощью TimestampExtractor - PullRequest
0 голосов
/ 11 октября 2018

Я пытаюсь создать приложение Kafka Streams, в котором я пытаюсь вычислить уникальные устройства для каждой платформы в пределах временного окна.

Класс событий

public class Event {
    private String eventId;
    private String deviceId;
    private String platform;
    private ZonedDateTime createdAt;
}

Мне нужно, чтобы временное окно соответствовало созданному событию. Поэтому я написал реализацию TimestampExtractor, как показано ниже:

public class EventTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
        final Event event = (Event) record.value();
        final ZonedDateTime eventCreationTime = event.getCreatedAt();
        final long timestamp = eventCreationTime.toEpochSecond();

        log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);

        return timestamp;
    }
}

Наконец, вот мой код потокового приложения:

final KStream<String, Event> eventStream = builder.stream("events_ingestion");

eventStream
    .selectKey((key, event) -> {
        final String platform = event.getPlatform();
        final String deviceId = event.getDeviceId());

        return String.join("::", platform, deviceId);
    })
    .groupByKey()
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))
    .count(Materialized.as(COUNT_STORE));

Когда яперенесите событие в тему event_ingestion, я вижу, что отметка времени заносится в журналы приложений, а данные записываются в хранилище счетчиков.

Когда я перебираю хранилище счетчиков, я вижу следующее:

Key: [ANDROID::1@1539000000/1539900000], Value: 2

Хотя мое временное окно составляет 15 минут, ключ охватывает 10 дней.Если я удаляю свою реализацию TimestampExtractor из конфигурации потока (следовательно, возвращаюсь к времени обработки), ключ занимает 15 минут, как и ожидалось:

Key: [ANDROID::1@1539256500000/1539257400000], Value: 1

Что я здесь не так делаю?Есть идеи?

1 Ответ

0 голосов
/ 11 октября 2018

TimestampExtractor использует значение эпохи миллисекунд для управления окнами.Вы рассчитываете «секунды», которые помещают сообщение в неверное временное окно.

...