站内链接:
1. 消息队列 消息队列是一种异步的服务间通信方式, 常常用于分布式和微服务框架中, 其有如下几个基本特点:
a. 消息在消费和删除之前一直在队列中存在
b. 每条消息仅可被一个用户处理一次
从每条消息的消费者数量来看, 消息队列模式分为两种:
a. P2P: 基于 list 的消息队列就是点对点
b. 发布订阅模式: 发布者可以向多个消息频道推送消息
在整个消息流动过程中有三个角色扮演者:
producer: 负责将消息推入消息队列
consumer: 负责从队列中获取消息并处理
broker: 消息处理中心, 负责消息的存储, ack, retry 等
在平常工作中消息队列是一种必不可少的技术, 比如:
a. 应用解耦, 微服务架构: 应用之间互相解耦, 通过消息来进行交互, 从而避免接口异常导致信息传递失败, 前者比后者更加稳定
b. 异步处理, 这在分布式场景中被使用, 例如 celery 的异步任务
c. 限流和削峰, 如果没有消息队列, 则在大并发直接到达服务器从而导致造成瘫痪, 通过消息队列将无法处理的消息先缓存起来
2. 基于 list 的消息队列 技术演进 最简单的消息队列就是lpush + rpop, rpush + lpop
命令, redis 中的消息队列结构是链表, 所以这个操作代价是非常小的, 但是该方案存在缺点:
a. 实时消费问题, 轮询问题, rpop 是非阻塞调用, 如果队列为空, 则消费者需要不断轮询队列
b. 消息确认机制缺失, 一旦客户端发生宕机就会造成消息丢失
c. 数据无权重概念, 只支持先进先出
为了解决轮询问题提出方案 2, 其使用阻塞弹出命令来减少不必要的轮询, 当然这样做的前提是有一个独立的线程在阻塞等待消费. 阻塞消息队列基于lpush + brpop
来实现, 注意, 方案 2 仅仅解决了:
为了解决即时消费问题提出方案 3: lpush + lrange + rpop
, 其在客户端真正消费完成之前使用lrange
命令做读取但不消费操作, 但是该方案又产生了新的问题:
a. lrange 非阻塞, 故而产生了方案 1 中提出的问题: 不断轮询损耗性能问题
b. 重复消费问题, 客户端发生异常重启之后, 此时会重新lrange
已读取过但未被消费的信息
方案 3 的问题根本上是缺少消息 ACK 机制, 为此提出了可靠队列
模式: LPUSH+BRPOPLPUSH+LREM
, 其中BRPOPLPUSH
会把从 needConsume 消息队列中读取消息并备份到另外一个 doingConsume 消息队列中, 这个过程是原子性的, 一旦消息处理完毕就使用lrem
将消息从 doingConsume 中删除消息, 否则会在客户端重启之后重新从 doingConsume 中读取消息并处理. 这种思想类似分治或者分层策略. 另外, 对于重复消费问题, 需要在全局对消息设置唯一 ID, 没消费处理完成一条消息就缓存, 避免重复消费.
实战
方案 1:
1 2 3 4 5 lpush lrlist 1 2 3 4 5 rpop lrlist
方案 2:
1 2 3 4 5 6 lpush lbrlist 1 2 3 4 5 brpop lbrlist 10 brpop lbrlist 0
方案 3:
1 2 3 4 5 6 7 8 lpush lrrlist 1 2 3 4 5 lrange lrrlist -1 -1 rpop lrrlist
方案 4
1 2 3 4 5 6 7 8 lpush ltwolist 1 2 3 4 5 rpoplpush ltwolist ltwobak lrange ltwobak 0 -1
延时消息队列 使用场景 延时队列: 其具有队列的特性, 同时还具有延时消费的功能, 指定队列中的消息的时间信息以便后续延时消费. 在一些订单管理有时效性的场景中使用比较频繁, 比如:
数据中台统一对采集数据进行批量处理, 其能够接收一定的延时性, 确保所有的数据在一定时间之后都能存储到延时队列中并统一处理
订单管理中对于下单 15 分钟未付款的账单进行取消并回复库存操作
注册超过 30 天但是未邮箱验证的客户进行二次确认
一个合格的延时消息队列有如下特点:
每个消息都有时间戳或者 score 概念
消息在延时队列中依靠 score(时间戳)进行排序
消息队列可以通过 score 范围获取和删除队列中的元素, 这个过程必须是原子性的
基于 sorted set 的延时队列 通过zadd + zrevrangebyscore + zrem
实现延时队列
1 2 3 4 5 6 7 8 zadd delaySet 1000001 "uid00001" 1000010 "uid00002" 1000015 "uid00003" zrevrangebyscore delaySet 1000030 1000010 zrem delaySet "uid00002"
发布和订阅 场景 pub/sub
与上面的基于列表消息队列, 延时消息队列都不同, 从使用场景和使用方式上都是不同的:
a. 支持一个消息多个消费者或订阅者
b. 消息到达客户端不是使用轮询的方式而是采用推送的方式, 但是客户端仍然需要有监控进程
这种消息的推送类似某些监控接口的两种实现方式: 前端定时轮询接口
, 基于websocket的消息推送
, 很明显后者在频繁调用的, 多客户端(非大并发)的场景中性能更加有优势. 在pub/sub
中有几个重要的名词或者角色:
发布者: 向指定的频道推送消息, 这是一对多的
订阅者: 订阅一个或者多个频道, 自身还支持通过 GLOB 匹配模式来进行模式订阅
频道: 某个特定的消息队列
模式: 基于 glob 匹配规则的频道模式
最后, sub/pub
模式并非完美, 其仍然有一些待解决的问题:
数据非持久化, 一旦宕机或网络断开, 则消息就会丢失
无 ACK 机制来确保数据的可靠性
redis 的发布和订阅 在介绍 redis 的发布和订阅之前先简单的介绍下消息的格式, 其中频道和模式订阅命令的消息格式有一些区别, 在我们订阅某个频道之后会输出如下信息:
1 2 3 1) "subscribe" # 表示信息类型, subscribe-订阅, unsubscribe-取消订阅, message-消息 2) "subsecond" # 表示频道名 3) (integer) 1 # 表示统计信息(数量)或信息, 在message类型中值为消息
通过subscribe + publish
实现消息的订阅和发布, 其输出相比频道多了一条信息:
1 2 3 4 5 6 7 8 9 10 ------订阅------ 1) "psubscribe" 2) "sub*" 3) (integer) 1 ------消息------ 1) "pmessage" 2) "sub*" 3) "subsecond" 4) "hellow"
其中频道和模式的命令测试如下:
1 2 3 4 subscribe subfirst subsecond publish subsecond hello
通过模式匹配psubscribe + publish
可以让客户端匹配多个频道
1 2 3 4 5 psubscribe sub* publish subsecond secondmsg publish subfirst firstmsg
python 实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import sysimport getoptimport timeimport redisdef usage (): """usage""" usage_doc = """消息的发布和订阅 Usage: python redis_pubsub.py -option -m [MODULES] -s M -d N Options: -h or --help --action 动作, sub-订阅, pub-发布 For more info visit http://www.unlessbamboo.top/""" print (usage_doc) def get_cli_options (argv ): """get_cli_options:get command options :param argv: """ short_opt = "" long_opt = ["action=" ] try : opts, args = getopt.getopt(argv[1 :], short_opt, long_opt) except getopt.GetoptError as msg: print ("Occur error, msg:{0}" .format (msg)) usage() sys.exit(1 ) if not opts: usage() sys.exit(1 ) return opts[0 ][1 ] class RedisHelper : """ RedisHelper """ def __init__ (self ): self.__conn = redis.Redis(host='127.0.0.1' , port=6379 ) self.chan_sub = 'channel1' self.chan_pub = 'channel1' def publish (self, msg ): self.__conn.publish(self.chan_pub, msg) return True def subscribe (self ): pub = self.__conn.pubsub() pub.subscribe(self.chan_pub) pub.parse_response() return pub def sub_channel (): obj = RedisHelper() redis_sub = obj.subscribe() print ('-开始订阅并等待消息-' ) while True : msg = redis_sub.parse_response() if not msg: continue print (msg[2 ].decode('utf8' )) def pub_msg (): """ 将当前时间戳作为消息发布出去 """ obj = RedisHelper() curtime = time.ctime() obj.publish(f'hello, {curtime} ' ) if __name__ == '__main__' : """ 测试命令 1. 启动订阅(阻塞等待): python redis_pubsub.py --action=sub 2. 推送消息: python redis_pubsub.py --action=pub """ action = get_cli_options(sys.argv) if action == 'sub' : sub_channel() else : pub_msg()
stream 消息 机制和场景 上面已经介绍过基于 list 的简单消息队列, 基于 ordered set 的延时消息队列, 发布和订阅消息, 他们都有着各自的缺点:
List 没有消息多播功能, 没有 ACK 机制, 无法重复消费
pub 和 sub 消息无法持久化, 一旦网络断开或者宕机则数据丢失, 无 ACK 机制
Sorted Set 不支持阻塞式获取消息, 不允许重复消费, 不支持分组
除了上述三者之外, 在 redis 5.0 新增了一个更加强大的数据结构–Stream, 其提供了如下功能:
消息的持久化: 让任何消费者访问任何时刻的历史消息, 每个消息都有唯一的 ID 和对应的内容, 客户端可以访问任意时刻的数据
消息多播功能: 同一个消息可被分发给多个单消费者和消费者组
阻塞: 提供了对于消费者和消费者组的阻塞, 非阻塞获取信息功能
消费者组: 提供了复杂的功能
每一个 Stream 都有自己唯一的名称, 关于 stream 命令见redis 命令 中的介绍. Stream 的设计参考了 kafka, 其中一些专业术语介绍如下:
Consumer Group: 每个消费组状态独立, 互不影响
last_delivered_id: 每个消费组会有个游标 last_delivered_id 在数组之上往前移动
pending_ids: 消费者的状态变量, 作用是维护消费者的未确认的 id, 其记录了当前被客户端读取但是未 ACK 的消息
简单读写实例 这里没有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def get_data (redis_connection ): """ 读取stream消息: xread count 2 streams streamKey 0-0 """ last_id = 0 sleep_ms = 5000 while True : try : resp = redis_connection.xread( {stream_key: last_id}, count=1 , block=sleep_ms ) if resp: key, messages = resp[0 ] last_id, data = messages[0 ] print ("REDIS ID: " , last_id) print (" --> " , data) except ConnectionError as e: print ("ERROR REDIS CONNECTION: {}" .format (e)) def send_data (redis_connection, max_messages ): """ 往消息队列中推送消息: @命令: xadd streamKey * key1 msg1 key2 msg2 """ count = 0 while count < max_messages: try : data = { "producer" : producer, "some_id" : uuid4().hex , "count" : count, } resp = redis_connection.xadd(stream_key, data) print (f'推送消息:{resp} ' ) count += 1 except ConnectionError as e: print ("ERROR REDIS CONNECTION: {}" .format (e)) sleep(0.5 )
xgroup 实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 def create_xgroup (rds, stream, group_name ): """ 创建消费者组: xgroup create ${STREAM_KEY} bambooGroup 0 """ try : rsp = rds.xinfo_groups(stream) for item in rsp: if group_name == item['name' ].decode('utf8' ): print (f'xgroup:{group_name} 已存在, 无需再次创建' ) return except Exception as msg: pass group = rds.xgroup_create(stream, group_name, '0-0' , mkstream=True ) if not group: raise Exception(f'创建消费者组:{group_name} 异常' ) return group def get_data (rds, stream_key, group_name ): """ xreadgroup group mqGroup consumerA count 1 streams mq > """ count = 0 while True : try : streams = { stream_key: '>' , } msgs = rds.xreadgroup(group_name, 'userA' , streams, block=10 ) if not msgs: continue for msg in msgs: print (msg) except ConnectionError as e: print ("ERROR REDIS CONNECTION: {}" .format (e)) sleep(0.5 ) def send_data (rds, stream_key ): count = 0 while count < MAX_MESSAGES: try : data = { "producer" : 'xgroup' , "some_id" : uuid4().hex , "count" : count, } resp = rds.xadd(stream_key, data) print (f'推送消息:{resp} ' ) count += 1 except ConnectionError as e: print ("ERROR REDIS CONNECTION: {}" .format (e)) sleep(0.5 )
参考