Почему Redis не вызывает мой MessageListener в событиях __keyevent @ * __: expired? - PullRequest
0 голосов
/ 17 апреля 2019

Я недавно обновил Spring-Boot до 2.1.4.RELEASE и Spring-Cloud до Greenwich.SR1.Мой сервис работает на Java 11.Моя единственная зависимость от Redis - через spring-boot-starter-data-redis.Хотя я выполнил настройку на Redis, установив notify-keyspace-events Ex, все же, похоже, я не могу получить от него никаких событий истечения срока действия ключа.Это первый раз, когда я собираюсь получать такие события в целях тайм-аута.Что могло пойти не так?

Пожалуйста, помогите!

Это моя RedisConfiguration:

@Configuration
public class RedisConfiguration {

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.verification-code-topic}")
    private String verificationCodeTopic;

    @Bean
    public RedisConnectionFactory redisConnectionFactory(){
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(host);
        configuration.setPort(Integer.valueOf(port));
        return new LettuceConnectionFactory(configuration);
    }

    @Bean
    @Primary
    public RedisTemplate<FundRedisKey, ResetPasswordRequest> resetPasswordRedisTemplate(){
        RedisTemplate<FundRedisKey, ResetPasswordRequest> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setValueSerializer(resetPasswordRequestSerializer());
        redisTemplate.setKeySerializer(redisKeySerializer());
        return redisTemplate;
    }

    @Bean
    public RedisTemplate<FundRedisKey, VerificationMessage> verificationMessageRedisTemplate(){
        RedisTemplate<FundRedisKey, VerificationMessage> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setValueSerializer(verificationMessageSerializer());
        redisTemplate.setKeySerializer(redisKeySerializer());
        return redisTemplate;
    }

    @Bean
    @Primary
    public MessageListener verificationCodeMessageListener(){
        return new VerificationCodeSubscriber(verificationMessageSerializer(),
                resetPasswordRedisTemplate());
    }

    @Bean
    public MessageListener resetPasswordTimeoutListener(){
        return new ResetPasswordTimeoutSubscriber(resetPasswordRequestSerializer());
    }

    @Bean
    @Primary
    public MessageListenerAdapter verificationCodeMessageListenerAdapter(){
        return new MessageListenerAdapter(verificationCodeMessageListener());
    }

    @Bean
    public MessageListenerAdapter resetPasswordTimeoutMessageListenerAdapter(){
        return new MessageListenerAdapter(resetPasswordTimeoutListener());
    }

    @Bean
    public ChannelTopic verificationCodeTopic(){
        return new ChannelTopic(verificationCodeTopic);
    }

    @Bean
    @DependsOn(value = "taskExecutor")
    public RedisMessageListenerContainer fundMessageListenerContainer(
            @Qualifier("taskExecutor")Executor executor){
        RedisMessageListenerContainer messageListenerContainer = new RedisMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(redisConnectionFactory());
        messageListenerContainer.addMessageListener(
                verificationCodeMessageListenerAdapter(), verificationCodeTopic());
        messageListenerContainer.addMessageListener(
                resetPasswordTimeoutMessageListenerAdapter(), new PatternTopic("__keyevent@*__:expired"));
        messageListenerContainer.setTaskExecutor(executor);
        return messageListenerContainer;
    }

    @Bean
    public MessagePublisher verificationCodeMessagePublisher(){
        return new VerificationCodePublisher(
                verificationMessageRedisTemplate(), verificationCodeTopic());

    }

    @Bean
    public RedisSerializer verificationMessageSerializer(){
        return new Jackson2JsonRedisSerializer(VerificationMessage.class);
    }

    @Bean
    @Primary
    public RedisSerializer resetPasswordRequestSerializer(){
        return new Jackson2JsonRedisSerializer(ResetPasswordRequest.class);
    }

    @Bean
    public RedisSerializer redisKeySerializer(){
        return new Jackson2JsonRedisSerializer(FundRedisKey.class);
    }
}

А вот мой ResetPasswordTimeoutSubscriber:

@Component
public class ResetPasswordTimeoutSubscriber implements MessageListener {

    @Value("${spring.redis.key}")
    private String key;

    private final RedisSerializer messageSerializer;

    public ResetPasswordTimeoutSubscriber(RedisSerializer messageSerializer){
        this.messageSerializer = messageSerializer;
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {

        ResetPasswordRequest resetPasswordRequest =
                (ResetPasswordRequest)messageSerializer.deserialize(message.getBody());

        //TODO Send operation timeout notification



    }
}

Вот моя конфигурация TaskExecutor

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer, SchedulingConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final JHipsterProperties jHipsterProperties;

    public AsyncConfiguration(JHipsterProperties jHipsterProperties) {
        this.jHipsterProperties = jHipsterProperties;
    }

    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
        executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
        executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
        executor.setThreadNamePrefix("app-1-Executor-");
        return new ExceptionHandlingAsyncTaskExecutor(executor);
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(scheduledTaskExecutor());
    }

    @Bean
    public ScheduledExecutorService scheduledTaskExecutor() {
        return Executors.newScheduledThreadPool(jHipsterProperties.getAsync().getCorePoolSize());
    }
}

1 Ответ

0 голосов
/ 18 апреля 2019

Я не положил @Bean на redisKeySerializer.Я разместил правильный ответ в виде тела вопроса.

...