Cache Kafka Records с использованием Caffeine Cache Springboot - PullRequest
0 голосов
/ 01 февраля 2020

Я пытаюсь кешировать записи Kafka в течение 3 минут после перерыва после того, как истек срок ее действия и он будет удален из кэша.

Каждая входящая запись, полученная с помощью kafka customer, написанная в springboot, должна быть обновлена ​​в сначала кеш, затем, если он присутствует, мне нужно удалить следующие дубликаты записей, если они совпадают с записью кеша.

Я попытался использовать кэш кофеина, как показано ниже,

@EnableCaching
public class AppCacheManagerConfig {

    @Bean
    public CacheManager cacheManager(Ticker ticker) {
        CaffeineCache bookCache = buildCache("declineRecords", ticker, 3);
        SimpleCacheManager cacheManager = new SimpleCacheManager();
        cacheManager.setCaches(Collections.singletonList(bookCache));
        return cacheManager;
    }

    private CaffeineCache buildCache(String name, Ticker ticker, int minutesToExpire) {
        return new CaffeineCache(name, Caffeine.newBuilder().expireAfterWrite(minutesToExpire, TimeUnit.MINUTES)
                .maximumSize(100).ticker(ticker).build());
    }

    @Bean
    public Ticker ticker() {
        return Ticker.systemTicker();
    }

}

и мой Kafka Consumer как показано ниже,

@Autowired
    CachingServiceImpl cachingService;

@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", concurrency = "#{'${spring.kafka.consumer.concurrentConsumers}'}", errorHandler = "#{'${spring.kafka.consumer.errorHandler}'}")
    public void consume(Message<?> message, Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long createTime) {
        logger.info("Recieved Message: " + message.getPayload());
        try {
            boolean approveTopic = false;
            boolean duplicateRecord = false;
if (cachingService.isDuplicateCheck(declineRecord)) {
//do something with records
}
else
{
//do something with records
}
    cachingService.putInCache(xmlJSONObj, declineRecord, time);

, а моя служба кэширования - как ниже,

@Component
public class CachingServiceImpl {
    private static final Logger logger = LoggerFactory.getLogger(CachingServiceImpl.class);
    @Autowired
    CacheManager cacheManager;

    @Cacheable(value = "declineRecords", key = "#declineRecord", sync = true)
    public String putInCache(JSONObject xmlJSONObj, String declineRecord, String time) {
        logger.info("Record is Cached for 3 minutes interval check", declineRecord);
        cacheManager.getCache("declineRecords").put(declineRecord, time);
        return declineRecord;

    }

    public boolean isDuplicateCheck(String declineRecord) {
        if (null != cacheManager.getCache("declineRecords").get(declineRecord)) {
            return true;
        }
        return false;
    }
}

Но каждый раз, когда запись поступает в потребитель, мой кэш всегда пуст. Записи не хранятся.

Изменения завершены:

Я добавил файл конфигурации, как показано ниже, после прохождения предложений и дополнительных исследований и разработок. убрал некоторые из более ранних логи c, и теперь кэширование работает должным образом, но повторная проверка не выполняется, когда все три потребителя отправляют одинаковые записи.

`

  @Configuration
  public class AppCacheManagerConfig {
  public static Cache<String, Object> jsonCache = 
  Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES)
            .maximumSize(10000).recordStats().build();
    @Bean
    public CacheLoader<Object, Object> cacheLoader() {
        CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {
            @Override
            public Object load(Object key) throws Exception {
                return null;
            }

            @Override
            public Object reload(Object key, Object oldValue) throws Exception {
                return oldValue;
            }
        };
        return cacheLoader;
    }

`

Теперь я использую приведенный выше кеш в качестве ручной установки и получения.

1 Ответ

0 голосов
/ 04 февраля 2020

Полагаю, вы пытаетесь реализовать дедупликацию записей для Kafka.

Вот аналогичное обсуждение:

https://github.com/spring-projects/spring-kafka/issues/80

Здесь текущий абстрактный класс, который вы можете расширить для достижения необходимого результата:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractFilteringMessageListener.java

Ваша служба кэширования определенно неверна: Cacheable аннотация позволяет пометить сборщики и установщики данных, чтобы добавить кеширование через АОП. В то время как в коде вы четко внедрили некоторые собственные низкоуровневые логи для обновления кэша c.

Вам могут помочь по крайней мере следующие возможные изменения:

  1. Remove @Cacheable. Вам это не нужно, потому что вы работаете с кешем вручную, поэтому он может стать источником конфликтов (особенно, когда вы используете sync = true). Если это помогает, удалите также @EnableCaching - он включает поддержку связанных с кэшем аннотаций Spring, которые вам здесь не нужны.

  2. Попробуйте удалить компонент Ticker с соответствующими параметрами для других бобов. Он не должен быть вредным для вашей конфигурации, но обычно это полезно только для тестов, нет необходимости определять его иначе.

  3. Перепроверьте, что такое declineRecord. Если это сериализованный объект, убедитесь, что сериализация работает должным образом.

  4. Добавьте recordStats() для кэша и выведите stats() для регистрации для дальнейшего анализа.

...