Получать данные потоков Redis, используя Spring & Lettuce - PullRequest
0 голосов
/ 20 сентября 2019

У меня есть код ниже Spring boot для получения значений всякий раз, когда поток Redis добавляется с новой записью.Проблема в том, что получатель никогда не получает никакого сообщения, также, когда подписчик, отмеченный subscriber.isActive(), всегда неактивен.Что не так в этом коде?Что я пропустил? Док для справки.

При запуске весенней загрузки инициализируйте необходимые ресурсы redis

Фабрика соединений салата

@Bean
public RedisConnectionFactory redisConnectionFactory() {
    return new LettuceConnectionFactory("127.0.0.1", 6379);
}

RedisTemplate с фабрики соединений

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
    RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(connectionFactory);
    return redisTemplate;
}

Контроллер покоядобавить данные в поток redis

@PutMapping("/{name}")
public String post(@PathVariable String name) {
    return redisTemplate.opsForStream().add(StreamRecords.newRecord().in("streamx").ofObject(name)).getValue();
}

прослушиватель императивных сообщений в стиле JMS

@Component
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {
    System.out.println("message received: " + message.getValue());
}

}

инициализировать прослушиватель

  @Bean
public Subscription listener(MyStreamListener streamListener, RedisConnectionFactory redisConnectionFactory) throws InterruptedException {
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
            .create(redisConnectionFactory);
    Subscription subscription = container.receive(Consumer.from("my-group-1", "consumer-1"),
            StreamOffset.create("streamx", ReadOffset.latest())), streamListener);
    System.out.println(subscription.isActive()); // always false
    return subscription;
}

Хотя я могу добавить кпоток через API.

...