Spring boot使用redis-stream实现监听者
Redis stream 是 Redis 5 引入的一种新的数据结构,它是一个高性能、高可靠性的消息队列,主要用于异步消息处理和流式数据处理。在此之前,想要使用 Redis 实现消息队列,通常可以使用例如:列表,有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势:
- 支持范围查找:内置的索引功能,可以通过索引来对消息进行范围查找
- 支持阻塞操作:避免低效的反复轮询查找消息
- 支持 ACK:可以通过确认机制来告知已经成功处理了消息,保证可靠性
- 支持多个消费者:多个消费者可以同时消费同一个流,Redis 会确保每个消费者都可以独立地消费流中的消息
本文将示范结合 Spring boot去使用redis的stream类型
参考 :
简化开发和学习的步奏 快速上手
注意 使用 redis 的 stream 得提前创建消费组,创建消费组需要添加一条数据
xadd stream * name test age 18
- XADD 添加元素到末尾
- stream是流的名字
- *为redis将会自动为新元素生成一个可用的新ID
- name test age 18 为参数
返回 id :1683885095976-0
messageId=1683885095976-0, stream=logstream, body=
pom
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<exclusions>
<!-- 去掉对 Lettuce 的依赖,因为 Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
配置
spring:
redis:
host: 192.168.50.189 # 地址
# host: 127.0.0.1
port: 6379 # 端口
database: 3 # 数据库索引
#***************** 自定义配置
monster:
#***************** redis stream
name: consumer-1
redis-stream-map:
mqtt:
class-name: mqttMessageListener
stream-group:
mqttstream: group-1
log:
class-name: logMessageListener
stream-group:
logstream: group-2
读取配置文件
/**
* @description: redis stream 配置文件<BR/>
* @author: monster <BR/>
* @date: 2023/5 <BR/>
*/
@Configuration
@ConfigurationProperties(prefix = "monster")
@Data
public class RedisStreamConfig {
// private Map<String,String> stream;
private String name;
private Map<String, StreamData> redisStreamMap;
@Data
public static class StreamData {
private String className;
private Map<String, String> streamGroup;
}
}
从stream消费消息:
这里只说一种阻塞消费
@AutoConfiguration
public class StreamConsumerRunner {
static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);
/*
@Value("${redis.stream.consumer}")
private String consumer;
*/
/**
* Redis连接的线程安全工厂。
*/
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* 线程池工作执行器
*/
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private RedisStreamConfig redisStreamConfig;
@Bean
public void streamRun() {
// 创建配置对象
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions =
StreamMessageListenerContainerOptions.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
.executor(threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(new ErrorHandler() {
//异常处理
@Override
public void handleError(Throwable t) {
// throw new RuntimeException(t);
t.printStackTrace();
}
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer()).build();
redisStreamConfig.getRedisStreamMap().forEach((streamK, streamV) -> {
//根据 流:分组 订阅
streamV.getStreamGroup().forEach((k, v) -> {
//一旦创建,StreamMessageListenerContainer就可以订阅Redis流并消费传入的消息。
// StreamMessageListenerContainer允许多个流读请求,并为每个读请求返回一个订阅句柄。
// 取消订阅最终将终止后台轮询。使用键和值序列化器转换消息,以支持各种序列化策略
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, streamMessageListenerContainerOptions);
container.receive(Consumer.from(v,
redisStreamConfig.getName()),
//表示流的读偏移量的值对象
StreamOffset.create(k, ReadOffset.lastConsumed())
//获取流订阅业务处理Bean
, (StreamListener) StaticMethodGetBean.getBean(streamV.getClassName()));
//开启流订阅
container.start();
});
});
}
}