Итак, чтобы решить эту проблему, я создал пользовательский TimestampExtractor
и использовал его, чтобы изменить время создания окна потоков для записи времени из полезной нагрузки, как показано ниже.
public class RecordTimeStampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());
Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());
return recordTimestamp.getTime();
}
}
, поэтому теперь я протестировал его смой местный часовой пояс со вчерашнего дня, который является IST 05:30, и он работает нормально, а потоки kafka создают окна на основе метки времени записей.Будет также тестироваться с другим часовым поясом и обновит ответ