百行代码实现基于Redis的可靠延迟队列_Redis_脚本之家


本站和网页 https://www.jb51.net/article/252633.htm 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

百行代码实现基于Redis的可靠延迟队列_Redis_脚本之家
脚本之家
服务器常用软件
手机版
投稿中心
关注微信
快捷导航
软件下载
android
MAC
驱动下载
字体下载
DLL
源码下载
PHP
ASP.NET
ASP
JSP
软件编程
C#
JAVA
C 语言
Delphi
Android
网络编程
PHP
ASP.NET
ASP
JavaScript
在线工具
CSS格式化
JS格式化
Html转化为Js
数据库
MYSQL
MSSQL
oracle
DB2
MARIADB
CMS
PHPCMS
DEDECMS
帝国CMS
WordPress
常用工具
PHP开发工具
python
Photoshop
必备软件
网站首页
网页制作
网络编程
脚本专栏
脚本下载
数据库
服务器
电子书籍
操作系统
网站运营
平面设计
其它
媒体动画
电脑基础
硬件教程
网络安全
MsSql
Mysql
mariadb
oracle
DB2
mssql2008
mssql2005
SQLite
PostgreSQL
MongoDB
Redis
Access
数据库文摘
数据库其它
您的位置:首页 → 数据库 → Redis → Redis 可靠延迟队列
百行代码实现基于Redis的可靠延迟队列
更新时间:2022年06月23日 08:28:34 作者:Finley
本文主要介绍了百行代码实现基于Redis的可靠延迟队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
目录原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库。不过问题不大,没有轮子我们自己造😎。
本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用。
使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member, 投递时间戳作为 score 使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。
然而消息队列不是将消息发给消费者就万事大吉,它们还有一个重要职责是确保送达和消费。通常的实现方式是当消费者收到消息后向消息队列返回确认(ack),若消费者返回否定确认(nack)或超时未返回,消息队列则会按照预定规则重新发送,直到到达最大重试次数后停止。如何实现 ack 和重试机制是我们要重点考虑的问题。
我们的消息队列允许分布式地部署多个生产者和消费者,消费者实例定时执行 lua 脚本驱动消息在队列中的流转无需部署额外组件。由于 Redis 保证了 lua 脚本执行的原子性,整个流程无需加锁。
消费者采用拉模式获得消息,保证每条消息至少投递一次,消息队列会重试超时或者被否定确认的消息(nack) 直至到达最大重试次数。一条消息最多有一个消费者正在处理,减少了要考虑的并发问题。
请注意:若消费时间超过了 MaxConsumeDuration 消息队列会认为消费超时并重新投递,此时可能有多个消费者同时消费。
具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注册处理消息的回调函数
// 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
return true
})
// 发送延时消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
// start consume
done := queue.StartConsume()
<-done
由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。
原理详解
消息队列涉及几个关键的 redis 数据结构:
msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。readyKey: 列表类型,需要投递的消息 ID。unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。retryKey: 列表类型,已到重试时间的消息 IDgarbageKey: 集合类型,用于暂存已达重试上线的消息 IDretryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数
流程如下图所示:
由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。
因此我们需要保证上图中五个操作满足三个条件:
都是原子性的不会重复处理同一条消息操作前后消息队列始终处于正确的状态
只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。
是不是听起来有点吓人?😂 其实简单的很,让我们一起来详细看看吧~
pending2ReadyScript
pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中删除已投递的消息
ready2UnackScript
ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg
unack2RetryScript
unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次数大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重试次数为 0
redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 将已处理的消息从 unack key 中删除
因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
if len(msgIds) == 0 {
return nil
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
return nil
之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 gc 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。
ack
ack 只需要将消息彻底删除即可:
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
return nil
consume
消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:
func (q *DelayQueue) consume() error {
// 执行 pending2ready,将已到时间的消息转移到 ready
err := q.pending2Ready()
if err != nil {
return err
// 循环调用 ready2Unack 拉取消息进行消费
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
if err != nil {
return err
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
if err != nil {
return err
if fetchCount >= q.fetchLimit {
break
// 将 nack 或超时的消息放入重试队列
err = q.unack2Retry()
if err != nil {
return err
// 清理已达到最大重试次数的消息
err = q.garbageCollect()
if err != nil {
return err
// 消费重试队列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
if err != nil {
return err
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
if err != nil {
return err
if fetchCount >= q.fetchLimit {
break
return nil
至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢😘😋?
到此这篇关于百行代码实现基于Redis的可靠延迟队列的文章就介绍到这了,更多相关百行代码实现基于Redis的可靠延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
您可能感兴趣的文章:Redis如何实现延迟队列redis延迟双删策略示例讲解Redis实现延迟队列的全流程详解Redis优雅地实现延迟队列的方法分享Redisson延迟队列执行流程源码解析关于redis的延迟双删策略总结浅谈Redis常见延迟问题定位与分析使用redis实现延迟通知功能(Redis过期键通知)Redis延迟队列和分布式延迟队列的简答实现基于Redis延迟队列的实现代码redis实现延迟任务的项目实践
Redis
可靠延迟队列
相关文章
Redis 单机安装和哨兵模式集群安装的实现本文主要介绍了Redis 单机安装和哨兵模式集群安装的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 2022-07-07
k8s部署redis集群实现过程实例详解这篇文章主要为大家介绍了k8s部署redis集群实现过程实例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2023-02-02
SpringSession通过Redis统计在线用户数量的实现代码这篇文章主要介绍了SpringSession通过Redis统计在线用户数量,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下 2023-04-04
Redis主从复制操作和配置详情这篇文章主要介绍了Redis主从复制操作和配置详情,文章通过围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下 2022-09-09
redis protocol通信协议及使用详解这篇文章主要为大家介绍了redis protocol通信协议及使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2022-07-07
Redis Sentinel服务配置流程(详解)下面小编就为大家带来一篇Redis Sentinel服务配置流程(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧 2017-03-03
Redis妙用之存储用户token问题这篇文章主要介绍了Redis妙用之存储用户token问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教 2023-03-03
Redis数据结构之跳跃表使用学习这篇文章主要为大家介绍了Redis数据结构之跳跃表使用学习,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2023-07-07
关于使用Redisson订阅数问题本文主要介绍了关于使用Redisson订阅数问题,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 2022-01-01
Redis 事务与过期时间详细介绍这篇文章主要介绍了Redis 事务与过期时间详细介绍的相关资料,需要的朋友可以参考下 2017-05-05
最新评论
大家感兴趣的内容
1超强、超详细Redis数据库入门教程2redis常用命令、常见错误、配置技巧等分享3Redis中5种数据结构的使用场景介绍4Redis操作命令总结5Redis 密码设置和查看密码的方法6推荐几款 Redis 可视化工具(太厉害了)764位Windows下安装Redis教程8redis 查看所有的key方式9redis中使用redis-dump导出、导入、还原数据实例10Redis中统计各种数据大小的方法
最近更新的内容
Redis高级数据类型Hyperloglog、Bitmap的使用Windows系统安装Redis的详细图文教程redis requires ruby version2.2.2的解决方案Redis分布式锁的正确实现方法总结Redis主从复制详解Redis 的 GeoHash详解Redis如何使用乐观锁(CAS)保证数据一致性浅谈Redis在分布式系统中的协调性运用一文详细介绍Redis7持久化机制RDB和AOFRedis集群的关闭与重启操作
常用在线小工具
微信
投稿
脚本任务
在线工具
关注微信公众号
关于我们 -
广告合作 -
联系我们 -
免责声明 -
网站地图 -
投诉建议 -
在线投稿
CopyRight 2006-2023 JB51.Net Inc All Rights Reserved. 脚本之家 版权所有