通过Redis的Lists实现简单队列

工作 / java / 2022-04-07

通过Redis的Lists实现简单队列

前言:

比如有两个进程A和B,它们共享一个固定大小的缓冲区,A进程产生数据放入缓冲区,B进程从缓冲区中取出数据进行计算,那么这里其实就是一个生产者和消费者的模式,A相当于生产者,B相当于消费者。

RPOP/LPOP

Redis不仅可以作为缓存服务器,还可以作消息队列。它的Lists列表类型天生支持用作消息队列。

Redis Lists链表

由于Redis列表使用双向链表实现的,所以无论是从头尾两边插取元素都是非常快的。

查看Redis的Lists官网(可以去查看中文的Redis )。

可以知道 Redis的Lists的命令

简单实现可以直接使用 RPUSH/LPOP 右进左出

127.0.0.1:6379> RPUSH rk v1
(integer) 1
127.0.0.1:6379> RPUSH rk v2
(integer) 2
127.0.0.1:6379> RPUSH rk v3
(integer) 3
127.0.0.1:6379> RPUSH rk v4
(integer) 4
127.0.0.1:6379> LPOP rk
"v1"
127.0.0.1:6379> LPOP rk
"v2"
127.0.0.1:6379> LPOP rk
"v3"
127.0.0.1:6379> LPOP rk
"v4"

,或者 LPUSH/RPOP左进右出。

127.0.0.1:6379> LPUSH lk v1
(integer) 1
127.0.0.1:6379> LPUSH lk v2
(integer) 2
127.0.0.1:6379> LPUSH lk v3
(integer) 3
127.0.0.1:6379> LPUSH lk v4
(integer) 4
127.0.0.1:6379> RPOP lk
"v1"
127.0.0.1:6379> RPOP lk
"v2"
127.0.0.1:6379> RPOP lk
"v3"
127.0.0.1:6379> RPOP lk
"v4"

同时使用监听者模式,来处理进入线程的任务,基本的要素已经满足了。

/**
*伪代码
**/

//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
    log.info("RedisMoneyTask定时任务开始");
    //这里用Redis队列中获取
    redisTemplate.opsForList().leftPop(key);
    redisTemplate.opsForList().rightPop(key);
    try {
        //处理业务
         Thread.sleep(5000);
    } catch (InterruptedException e) {
          e.printStackTrace();
    }
    log.info("RedisMoneyTask定时任务完成");
}

这里会出现第二个问题,单纯的从Lists中获取 会获取 nul,监听者会继续的往下走,这里就得引入Redis的阻塞队列了。

BRPOP/BLPOP

BLPOP:删除并获取改列表的第一元素,或阻塞,直到有一个可用

BRPOP:删除并获取改列表的最后一个元素,或阻塞,直到有一个可用

127.0.0.1:6379> RPUSH list1 v1 v2 v3 v4
(integer) 4
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v4"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v3"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v2"
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "v1"
127.0.0.1:6379> BRPOP list1 0


阻塞都是一样的 在等待中…

配合定时任务的 fixedDelay 策略(Spring Boot 中使用 Spring Task 实现定时任务 有介绍)

当队列没有元素时阻塞,处理完任务后50ms再进行下一个任务

/**
*伪代码
**/

//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
    log.info("RedisMoneyTask定时任务开始");
    //这里用Redis队列中获取,添加Duration.ZERO就是启用阻塞
    redisTemplate.opsForList().leftPop(key,Duration.ZERO);
    redisTemplate.opsForList().rightPop(key,Duration.ZERO);
    try {
        //处理业务
         Thread.sleep(5000);
    } catch (InterruptedException e) {
          e.printStackTrace();
    }
    log.info("RedisMoneyTask定时任务完成");
}

这里又会引出新问题,如果业务没处理成功程序就意外中断了。
redis Lists链表.png

BRPOPLPUSH

BRPOPLPUSHRPOPLPUSH 的阻塞版本。 当 source 包含元素的时候,这个命令表现得跟 RPOPLPUSH 一模一样。 当 source 是空的时候,Redis将会阻塞这个连接,直到另一个客户端 push 元素进入或者达到 timeout 时限。 timeout 为 0 能用于无限期阻塞客户端。

/**
*伪代码
**/

//spring的定时任务
@Scheduled(fixedDelay = 50)
public void task() {
    log.info("RedisMoneyTask定时任务开始");
         //判断目标队列是否有对象
        Long aLong = listRedisUtils.listLlen(RedisEnum.REDIS_MONEY_DESTINATION_KEY);
        log.info("aLong:{}", aLong);
        if (aLong > 0) {
            //获取目标队列对象判断是否保存到数据库
            JSONObject o1 = (JSONObject) listRedisUtils.listRightPop(RedisEnum.REDIS_MONEY_DESTINATION_KEY);
            String id = o1.getStr("id");
            log.info("id:{}", id);
            Demo byId = demoService.getById(id);
            if (ObjectUtil.isNotEmpty(o1)
                    &&
                    ObjectUtil.isEmpty(byId)) {
                //如果目标对象没有保存到数据库的话把对象从源队列最后加进去 插队
                listRedisUtils.listAddInEnd(RedisEnum.REDIS_MONEY_SOURCE_KEY, o1);
            }
            //如果目标对象已经保存到数据库无操作
        }
    	//返回最新的对象
        return (JSONObject) listRedisUtils
                .listBRightPopLPush(
                        RedisEnum.REDIS_MONEY_SOURCE_KEY
                        , RedisEnum.REDIS_MONEY_DESTINATION_KEY);
    }

    try {
        //处理业务
         Thread.sleep(5000);
    } catch (InterruptedException e) {
          e.printStackTrace();
    }
    log.info("RedisMoneyTask定时任务完成");
}