Я пытаюсь кешировать записи Kafka в течение 3 минут после перерыва после того, как истек срок ее действия и он будет удален из кэша.
Каждая входящая запись, полученная с помощью kafka customer, написанная в springboot, должна быть обновлена в сначала кеш, затем, если он присутствует, мне нужно удалить следующие дубликаты записей, если они совпадают с записью кеша.
Я попытался использовать кэш кофеина, как показано ниже,
@EnableCaching
public class AppCacheManagerConfig {
@Bean
public CacheManager cacheManager(Ticker ticker) {
CaffeineCache bookCache = buildCache("declineRecords", ticker, 3);
SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(Collections.singletonList(bookCache));
return cacheManager;
}
private CaffeineCache buildCache(String name, Ticker ticker, int minutesToExpire) {
return new CaffeineCache(name, Caffeine.newBuilder().expireAfterWrite(minutesToExpire, TimeUnit.MINUTES)
.maximumSize(100).ticker(ticker).build());
}
@Bean
public Ticker ticker() {
return Ticker.systemTicker();
}
}
и мой Kafka Consumer как показано ниже,
@Autowired
CachingServiceImpl cachingService;
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", concurrency = "#{'${spring.kafka.consumer.concurrentConsumers}'}", errorHandler = "#{'${spring.kafka.consumer.errorHandler}'}")
public void consume(Message<?> message, Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long createTime) {
logger.info("Recieved Message: " + message.getPayload());
try {
boolean approveTopic = false;
boolean duplicateRecord = false;
if (cachingService.isDuplicateCheck(declineRecord)) {
//do something with records
}
else
{
//do something with records
}
cachingService.putInCache(xmlJSONObj, declineRecord, time);
, а моя служба кэширования - как ниже,
@Component
public class CachingServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(CachingServiceImpl.class);
@Autowired
CacheManager cacheManager;
@Cacheable(value = "declineRecords", key = "#declineRecord", sync = true)
public String putInCache(JSONObject xmlJSONObj, String declineRecord, String time) {
logger.info("Record is Cached for 3 minutes interval check", declineRecord);
cacheManager.getCache("declineRecords").put(declineRecord, time);
return declineRecord;
}
public boolean isDuplicateCheck(String declineRecord) {
if (null != cacheManager.getCache("declineRecords").get(declineRecord)) {
return true;
}
return false;
}
}
Но каждый раз, когда запись поступает в потребитель, мой кэш всегда пуст. Записи не хранятся.
Изменения завершены:
Я добавил файл конфигурации, как показано ниже, после прохождения предложений и дополнительных исследований и разработок. убрал некоторые из более ранних логи c, и теперь кэширование работает должным образом, но повторная проверка не выполняется, когда все три потребителя отправляют одинаковые записи.
`
@Configuration
public class AppCacheManagerConfig {
public static Cache<String, Object> jsonCache =
Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES)
.maximumSize(10000).recordStats().build();
@Bean
public CacheLoader<Object, Object> cacheLoader() {
CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {
@Override
public Object load(Object key) throws Exception {
return null;
}
@Override
public Object reload(Object key, Object oldValue) throws Exception {
return oldValue;
}
};
return cacheLoader;
}
`
Теперь я использую приведенный выше кеш в качестве ручной установки и получения.