通过Redis的Lists实现简单队列
前言:
比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者。
RPOP/LPOP
Redis不仅可以作为缓存服务器,还可以作消息队列。它的Lists列表类型天生支持用作消息队列。
由于Redis列表使用双向链表实现的,所以无论是从头尾两边插取元素都是非常快的。
查看Redis的Lists官网(可以去查看中文的Redis )。
可以知道 Redis的Lists的命令
简单实现可以直接使用 RPUSH/LPOP 右进左出
127.0.0.1:6379> RPUSH rk v1
(integer) 1
127.0.0.1:6379> RPUSH rk v2
(integer) 2
127.0.0.1:6379> RPUSH rk v3
(integer) 3
127.0.0.1:6379> RPUSH rk v4
(integer) 4
127.0.0.1:6379> LPOP rk
"v1"
127.0.0.1:6379> LPOP rk
"v2"
127.0.0.1:6379> LPOP rk
"v3"
127.0.0.1:6379> LPOP rk
"v4"
,或者 LPUSH/RPOP左进右出。
127.0.0.1:6379> LPUSH lk v1
(integer) 1
127.0.0.1:6379> LPUSH lk v2
(integer) 2
127.0.0.1:6379> LPUSH lk v3
(integer) 3
127.0.0.1:6379> LPUSH lk v4
(integer) 4
127.0.0.1:6379> RPOP lk
"v1"
127.0.0.1:6379> RPOP lk
"v2"
127.0.0.1:6379> RPOP lk
"v3"
127.0.0.1:6379> RPOP lk
"v4"
同时使用监听者模式,来处理进入线程的任务,基本的要素已经满足了。
/**
*伪代码
**/
//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
log.info("RedisMoneyTask定时任务开始");
//这里用Redis队列中获取
redisTemplate.opsForList().leftPop(key);
redisTemplate.opsForList().rightPop(key);
try {
//处理业务
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("RedisMoneyTask定时任务完成");
}
这里会出现第二个问题,单纯的从Lists中获取 会获取 nul,监听者会继续的往下走,这里就得引入Redis的阻塞队列了。
BRPOP/BLPOP
BLPOP:删除并获取改列表的第一元素,或阻塞,直到有一个可用
BRPOP:删除并获取改列表的最后一个元素,或阻塞,直到有一个可用
127.0.0.1:6379> RPUSH list1 v1 v2 v3 v4
(integer) 4
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v4"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v3"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v2"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v1"
127.0.0.1:6379> BRPOP list1 0
阻塞都是一样的 在等待中…
配合定时任务的 fixedDelay 策略(Spring Boot 中使用 Spring Task 实现定时任务 有介绍)
当队列没有元素时阻塞,处理完任务后50ms再进行下一个任务
/**
*伪代码
**/
//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
log.info("RedisMoneyTask定时任务开始");
//这里用Redis队列中获取,添加Duration.ZERO就是启用阻塞
redisTemplate.opsForList().leftPop(key,Duration.ZERO);
redisTemplate.opsForList().rightPop(key,Duration.ZERO);
try {
//处理业务
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("RedisMoneyTask定时任务完成");
}
这里又会引出新问题,如果业务没处理成功程序就意外中断了。
BRPOPLPUSH
BRPOPLPUSH
是 RPOPLPUSH 的阻塞版本。 当 source 包含元素的时候,这个命令表现得跟 RPOPLPUSH 一模一样。 当 source 是空的时候,Redis将会阻塞这个连接,直到另一个客户端 push 元素进入或者达到 timeout 时限。 timeout 为 0 能用于无限期阻塞客户端。
/**
*伪代码
**/
//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
log.info("RedisMoneyTask定时任务开始");
//判断目标队列是否有对象
Long aLong = listRedisUtils.listLlen(RedisEnum.REDIS_MONEY_DESTINATION_KEY);
log.info("aLong:{}", aLong);
if (aLong > 0) {
//获取目标队列对象判断是否保存到数据库
JSONObject o1 = (JSONObject) listRedisUtils.listRightPop(RedisEnum.REDIS_MONEY_DESTINATION_KEY);
String id = o1.getStr("id");
log.info("id:{}", id);
Demo byId = demoService.getById(id);
if (ObjectUtil.isNotEmpty(o1)
&&
ObjectUtil.isEmpty(byId)) {
//如果目标对象没有保存到数据库的话把对象从源队列最后加进去 插队
listRedisUtils.listAddInEnd(RedisEnum.REDIS_MONEY_SOURCE_KEY, o1);
}
//如果目标对象已经保存到数据库无操作
}
//返回最新的对象
return (JSONObject) listRedisUtils
.listBRightPopLPush(
RedisEnum.REDIS_MONEY_SOURCE_KEY
, RedisEnum.REDIS_MONEY_DESTINATION_KEY);
}
try {
//处理业务
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("RedisMoneyTask定时任务完成");
}