Spring boot使用redis-stream实现监听者

工作 / 2023-05-15

Spring boot使用redis-stream实现监听者

Redis stream 是 Redis 5 引入的一种新的数据结构,它是一个高性能、高可靠性的消息队列,主要用于异步消息处理和流式数据处理。在此之前,想要使用 Redis 实现消息队列,通常可以使用例如:列表,有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势:

  • 支持范围查找:内置的索引功能,可以通过索引来对消息进行范围查找
  • 支持阻塞操作:避免低效的反复轮询查找消息
  • 支持 ACK:可以通过确认机制来告知已经成功处理了消息,保证可靠性
  • 支持多个消费者:多个消费者可以同时消费同一个流,Redis 会确保每个消费者都可以独立地消费流中的消息

本文将示范结合 Spring boot去使用redis的stream类型

参考 :

如何在Springboot中使用Redis5的Stream

redis 流 stream的使用总结

简化开发和学习的步奏 快速上手

注意 使用 redis 的 stream 得提前创建消费组,创建消费组需要添加一条数据

xadd stream * name test age 18

  • XADD 添加元素到末尾
  • stream是流的名字
  • *为redis将会自动为新元素生成一个可用的新ID
  • name test age 18 为参数

返回 id :1683885095976-0

messageId=1683885095976-0, stream=logstream, body=

pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <exclusions>
        <!-- 去掉对 Lettuce 的依赖,因为 Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

配置

spring:
  redis:
    host: 192.168.50.189 # 地址
    #    host: 127.0.0.1
    port: 6379 # 端口
    database: 3 # 数据库索引
#***************** 自定义配置
monster:
  #***************** redis stream
  name: consumer-1
  redis-stream-map:
    mqtt:
      class-name: mqttMessageListener
      stream-group:
        mqttstream: group-1
    log:
      class-name: logMessageListener
      stream-group:
        logstream: group-2

读取配置文件

/**
 * &#064;description: redis stream 配置文件<BR/>
 * &#064;author: monster <BR/>
 * &#064;date: 2023/5 <BR/>
 */
@Configuration
@ConfigurationProperties(prefix = "monster")
@Data
public class RedisStreamConfig {

//    private Map<String,String> stream;

    private String name;
    private Map<String, StreamData> redisStreamMap;


    @Data
    public static class StreamData {
        private String className;
        private Map<String, String> streamGroup;
    }
}

从stream消费消息:

这里只说一种阻塞消费

@AutoConfiguration
public class StreamConsumerRunner {

    static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);

    /*
        @Value("${redis.stream.consumer}")
        private String consumer;
    */


    /**
     * Redis连接的线程安全工厂。
     */
    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 线程池工作执行器
     */
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Resource
    private RedisStreamConfig redisStreamConfig;

    @Bean
    public void streamRun() {

        // 创建配置对象
        StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions =
                StreamMessageListenerContainerOptions.builder()
                // 一次性最多拉取多少条消息
                .batchSize(10)

                // 执行消息轮询的执行器
                .executor(threadPoolTaskExecutor)
                // 消息消费异常的handler
                .errorHandler(new ErrorHandler() {
                    //异常处理
                    @Override
                    public void handleError(Throwable t) {
                        // throw new RuntimeException(t);
                        t.printStackTrace();
                    }
                })

                // 超时时间,设置为0,表示不超时(超时后会抛出异常)
                .pollTimeout(Duration.ZERO)
                // 序列化器
                .serializer(new StringRedisSerializer()).build();


        
        redisStreamConfig.getRedisStreamMap().forEach((streamK, streamV) -> {
            //根据 流:分组 订阅
            streamV.getStreamGroup().forEach((k, v) -> {
                //一旦创建,StreamMessageListenerContainer就可以订阅Redis流并消费传入的消息。
                // StreamMessageListenerContainer允许多个流读请求,并为每个读请求返回一个订阅句柄。
                // 取消订阅最终将终止后台轮询。使用键和值序列化器转换消息,以支持各种序列化策略
                StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                        StreamMessageListenerContainer.create(redisConnectionFactory, streamMessageListenerContainerOptions);
                container.receive(Consumer.from(v,
                                redisStreamConfig.getName()),
                        //表示流的读偏移量的值对象
                        StreamOffset.create(k, ReadOffset.lastConsumed())
                        //获取流订阅业务处理Bean
                        , (StreamListener) StaticMethodGetBean.getBean(streamV.getClassName()));
                //开启流订阅
                container.start();
            });
        });


    }

}