Kafka Steams Windowing на другое значение - PullRequest
0 голосов
/ 09 ноября 2018

Когда вы создаете окно с использованием потоков kafka, я предполагаю, что оно использует метку времени, когда была опубликована запись? Есть ли что-нибудь еще в окне чем-то еще.

Мой вариант использования - наш объект значения записи содержит временную метку, и это то, что мы хотим отображать в окне.

Если я сделаю что-то подобное, это будет окно с опубликованной отметкой времени. Я хочу, чтобы окно myObject.getCallTimestamp ()

KTable<Windowed<String>, MyObject> windowedPageViewCounts = pageViews
    .groupByKey(Serialized.with(Serdes.String(), myObjectSerde))
    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
    .count();

EDIT:

Исходя из рекомендаций ниже, я считаю, что это то, что мне нужно делать?

public class RecordTimeStampExtractor implements TimestampExtractor {

    //default timestamp extractor
    private FailOnInvalidTimestamp failOnInvalidTimestamp = new FailOnInvalidTimestamp();

    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
        //could also use consumerRecord.topic().equals("mytopic")
        if(consumerRecord.value() instanceof MyClass) {
            MyClass myClass = (MyClass) consumerRecord.value();
            return myClass.getRecordTimestamp().toEpochMilli();
        }
        return failOnInvalidTimestamp.extract(consumerRecord,l);
    }
}

1 Ответ

0 голосов
/ 09 ноября 2018

Вы можете реализовать и настроить (через default.timestamp.extractor) пользовательский TimestampExtractor, который возвращает myObject.getCallTimestamp().

Подробнее см. В документах: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-timestamp-extractor

...