У меня есть код ниже Spring boot
для получения значений всякий раз, когда поток Redis добавляется с новой записью.Проблема в том, что получатель никогда не получает никакого сообщения, также, когда подписчик, отмеченный subscriber.isActive()
, всегда неактивен.Что не так в этом коде?Что я пропустил? Док для справки.
При запуске весенней загрузки инициализируйте необходимые ресурсы redis
Фабрика соединений салата
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory("127.0.0.1", 6379);
}
RedisTemplate с фабрики соединений
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
Контроллер покоядобавить данные в поток redis
@PutMapping("/{name}")
public String post(@PathVariable String name) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord().in("streamx").ofObject(name)).getValue();
}
прослушиватель императивных сообщений в стиле JMS
@Component
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("message received: " + message.getValue());
}
}
инициализировать прослушиватель
@Bean
public Subscription listener(MyStreamListener streamListener, RedisConnectionFactory redisConnectionFactory) throws InterruptedException {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(redisConnectionFactory);
Subscription subscription = container.receive(Consumer.from("my-group-1", "consumer-1"),
StreamOffset.create("streamx", ReadOffset.latest())), streamListener);
System.out.println(subscription.isActive()); // always false
return subscription;
}
Хотя я могу добавить кпоток через API.