站内链接:

1. 消息队列

消息队列是一种异步的服务间通信方式, 常常用于分布式和微服务框架中, 其有如下几个基本特点:

  • a. 消息在消费和删除之前一直在队列中存在
  • b. 每条消息仅可被一个用户处理一次

从每条消息的消费者数量来看, 消息队列模式分为两种:

  • a. P2P: 基于 list 的消息队列就是点对点
  • b. 发布订阅模式: 发布者可以向多个消息频道推送消息

在整个消息流动过程中有三个角色扮演者:

  • producer: 负责将消息推入消息队列
  • consumer: 负责从队列中获取消息并处理
  • broker: 消息处理中心, 负责消息的存储, ack, retry 等

在平常工作中消息队列是一种必不可少的技术, 比如:

  • a. 应用解耦, 微服务架构: 应用之间互相解耦, 通过消息来进行交互, 从而避免接口异常导致信息传递失败, 前者比后者更加稳定
  • b. 异步处理, 这在分布式场景中被使用, 例如 celery 的异步任务
  • c. 限流和削峰, 如果没有消息队列, 则在大并发直接到达服务器从而导致造成瘫痪, 通过消息队列将无法处理的消息先缓存起来

2. 基于 list 的消息队列

技术演进

最简单的消息队列就是lpush + rpop, rpush + lpop命令, redis 中的消息队列结构是链表, 所以这个操作代价是非常小的, 但是该方案存在缺点:

  • a. 实时消费问题, 轮询问题, rpop 是非阻塞调用, 如果队列为空, 则消费者需要不断轮询队列
  • b. 消息确认机制缺失, 一旦客户端发生宕机就会造成消息丢失
  • c. 数据无权重概念, 只支持先进先出

为了解决轮询问题提出方案 2, 其使用阻塞弹出命令来减少不必要的轮询, 当然这样做的前提是有一个独立的线程在阻塞等待消费. 阻塞消息队列基于lpush + brpop来实现, 注意, 方案 2 仅仅解决了:

  • 轮询问题: 阻塞等待

为了解决即时消费问题提出方案 3: lpush + lrange + rpop, 其在客户端真正消费完成之前使用lrange命令做读取但不消费操作, 但是该方案又产生了新的问题:

  • a. lrange 非阻塞, 故而产生了方案 1 中提出的问题: 不断轮询损耗性能问题
  • b. 重复消费问题, 客户端发生异常重启之后, 此时会重新lrange已读取过但未被消费的信息

方案 3 的问题根本上是缺少消息 ACK 机制, 为此提出了可靠队列模式: LPUSH+BRPOPLPUSH+LREM, 其中BRPOPLPUSH会把从 needConsume 消息队列中读取消息并备份到另外一个 doingConsume 消息队列中, 这个过程是原子性的, 一旦消息处理完毕就使用lrem将消息从 doingConsume 中删除消息, 否则会在客户端重启之后重新从 doingConsume 中读取消息并处理. 这种思想类似分治或者分层策略.
另外, 对于重复消费问题, 需要在全局对消息设置唯一 ID, 没消费处理完成一条消息就缓存, 避免重复消费.

实战

  1. 方案 1:
1
2
3
4
5
# a. 生产
lpush lrlist 1 2 3 4 5

# b. 消费
rpop lrlist
  1. 方案 2:
1
2
3
4
5
6
# a. 生产
lpush lbrlist 1 2 3 4 5

# b. 消费, 设置超时时间
brpop lbrlist 10
brpop lbrlist 0
  1. 方案 3:
1
2
3
4
5
6
7
8
# a. 生产
lpush lrrlist 1 2 3 4 5

# b. 读取
lrange lrrlist -1 -1

# c. 消费
rpop lrrlist
  1. 方案 4
1
2
3
4
5
6
7
8
# a. 生产
lpush ltwolist 1 2 3 4 5

# b. 读取并备份
rpoplpush ltwolist ltwobak

# c. 查看备份队列
lrange ltwobak 0 -1

延时消息队列

使用场景

延时队列: 其具有队列的特性, 同时还具有延时消费的功能, 指定队列中的消息的时间信息以便后续延时消费. 在一些订单管理有时效性的场景中使用比较频繁, 比如:

  • 数据中台统一对采集数据进行批量处理, 其能够接收一定的延时性, 确保所有的数据在一定时间之后都能存储到延时队列中并统一处理
  • 订单管理中对于下单 15 分钟未付款的账单进行取消并回复库存操作
  • 注册超过 30 天但是未邮箱验证的客户进行二次确认

一个合格的延时消息队列有如下特点:

  • 每个消息都有时间戳或者 score 概念
  • 消息在延时队列中依靠 score(时间戳)进行排序
  • 消息队列可以通过 score 范围获取和删除队列中的元素, 这个过程必须是原子性的

基于 sorted set 的延时队列

通过zadd + zrevrangebyscore + zrem实现延时队列

1
2
3
4
5
6
7
8
# a. 添加延时消息
zadd delaySet 1000001 "uid00001" 1000010 "uid00002" 1000015 "uid00003"

# b. 获取指定分数的元素
zrevrangebyscore delaySet 1000030 1000010

# c. 在消息全部处理完成之后删除
zrem delaySet "uid00002"

发布和订阅

场景

pub/sub与上面的基于列表消息队列, 延时消息队列都不同, 从使用场景和使用方式上都是不同的:

  • a. 支持一个消息多个消费者或订阅者
  • b. 消息到达客户端不是使用轮询的方式而是采用推送的方式, 但是客户端仍然需要有监控进程

这种消息的推送类似某些监控接口的两种实现方式: 前端定时轮询接口, 基于websocket的消息推送, 很明显后者在频繁调用的, 多客户端(非大并发)的场景中性能更加有优势. 在pub/sub中有几个重要的名词或者角色:

  • 发布者: 向指定的频道推送消息, 这是一对多的
  • 订阅者: 订阅一个或者多个频道, 自身还支持通过 GLOB 匹配模式来进行模式订阅
  • 频道: 某个特定的消息队列
  • 模式: 基于 glob 匹配规则的频道模式

最后, sub/pub模式并非完美, 其仍然有一些待解决的问题:

  • 数据非持久化, 一旦宕机或网络断开, 则消息就会丢失
  • 无 ACK 机制来确保数据的可靠性

redis 的发布和订阅

在介绍 redis 的发布和订阅之前先简单的介绍下消息的格式, 其中频道和模式订阅命令的消息格式有一些区别, 在我们订阅某个频道之后会输出如下信息:

1
2
3
1) "subscribe"      # 表示信息类型, subscribe-订阅, unsubscribe-取消订阅, message-消息
2) "subsecond" # 表示频道名
3) (integer) 1 # 表示统计信息(数量)或信息, 在message类型中值为消息

通过subscribe + publish实现消息的订阅和发布, 其输出相比频道多了一条信息:

1
2
3
4
5
6
7
8
9
10
------订阅------
1) "psubscribe"
2) "sub*"
3) (integer) 1

------消息------
1) "pmessage"
2) "sub*"
3) "subsecond"
4) "hellow"

其中频道和模式的命令测试如下:

1
2
3
4
# a. 订阅指定的频道
subscribe subfirst subsecond
# b. 发布消息
publish subsecond hello

通过模式匹配psubscribe + publish可以让客户端匹配多个频道

1
2
3
4
5
# a. 订阅
psubscribe sub*
# b. 发布
publish subsecond secondmsg
publish subfirst firstmsg

python 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import sys
import getopt
import time
import redis


def usage():
"""usage"""
usage_doc = """消息的发布和订阅
Usage:
python redis_pubsub.py -option -m [MODULES] -s M -d N

Options:
-h or --help
--action
动作, sub-订阅, pub-发布

For more info visit http://www.unlessbamboo.top/"""
print(usage_doc)


def get_cli_options(argv):
"""get_cli_options:get command options

:param argv:
"""
short_opt = ""
long_opt = ["action="]
try:
opts, args = getopt.getopt(argv[1:], short_opt, long_opt)
except getopt.GetoptError as msg:
print("Occur error, msg:{0}".format(msg))
usage()
sys.exit(1)
if not opts:
usage()
sys.exit(1)
return opts[0][1]


class RedisHelper:
"""
RedisHelper
"""
def __init__(self):
self.__conn = redis.Redis(host='127.0.0.1', port=6379)
self.chan_sub = 'channel1'
self.chan_pub = 'channel1'

def publish(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True

def subscribe(self):
pub = self.__conn.pubsub()
pub.subscribe(self.chan_pub)
pub.parse_response()
return pub


def sub_channel():
obj = RedisHelper()
redis_sub = obj.subscribe()
print('-开始订阅并等待消息-')
while True:
msg = redis_sub.parse_response()
if not msg:
continue
print(msg[2].decode('utf8'))


def pub_msg():
""" 将当前时间戳作为消息发布出去 """
obj = RedisHelper()
curtime = time.ctime()
obj.publish(f'hello, {curtime}')


if __name__ == '__main__':
""" 测试命令
1. 启动订阅(阻塞等待): python redis_pubsub.py --action=sub
2. 推送消息: python redis_pubsub.py --action=pub
"""
action = get_cli_options(sys.argv)
if action == 'sub':
sub_channel()
else:
pub_msg()

stream 消息

机制和场景

上面已经介绍过基于 list 的简单消息队列, 基于 ordered set 的延时消息队列, 发布和订阅消息, 他们都有着各自的缺点:

  • List 没有消息多播功能, 没有 ACK 机制, 无法重复消费
  • pub 和 sub 消息无法持久化, 一旦网络断开或者宕机则数据丢失, 无 ACK 机制
  • Sorted Set 不支持阻塞式获取消息, 不允许重复消费, 不支持分组

除了上述三者之外, 在 redis 5.0 新增了一个更加强大的数据结构–Stream, 其提供了如下功能:

  • 消息的持久化: 让任何消费者访问任何时刻的历史消息, 每个消息都有唯一的 ID 和对应的内容, 客户端可以访问任意时刻的数据
  • 消息多播功能: 同一个消息可被分发给多个单消费者和消费者组
  • 阻塞: 提供了对于消费者和消费者组的阻塞, 非阻塞获取信息功能
  • 消费者组: 提供了复杂的功能

每一个 Stream 都有自己唯一的名称, 关于 stream 命令见redis 命令中的介绍. Stream 的设计参考了 kafka, 其中一些专业术语介绍如下:

  • Consumer Group: 每个消费组状态独立, 互不影响
  • last_delivered_id: 每个消费组会有个游标 last_delivered_id 在数组之上往前移动
  • pending_ids: 消费者的状态变量, 作用是维护消费者的未确认的 id, 其记录了当前被客户端读取但是未 ACK 的消息

简单读写实例

这里没有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 1. 消费者
def get_data(redis_connection):
""" 读取stream消息: xread count 2 streams streamKey 0-0 """
last_id = 0
sleep_ms = 5000
while True:
try:
resp = redis_connection.xread(
{stream_key: last_id}, count=1, block=sleep_ms
)
if resp:
key, messages = resp[0]
last_id, data = messages[0]
print("REDIS ID: ", last_id)
print(" --> ", data)

except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))

# 2. 生产者
def send_data(redis_connection, max_messages):
""" 往消息队列中推送消息:
@命令: xadd streamKey * key1 msg1 key2 msg2
"""
count = 0
while count < max_messages:
try:
data = {
"producer": producer,
"some_id": uuid4().hex, # Just some random data
"count": count,
}
resp = redis_connection.xadd(stream_key, data)
print(f'推送消息:{resp}')
count += 1

except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))
sleep(0.5)

xgroup 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 1. 创建消费者组
def create_xgroup(rds, stream, group_name):
""" 创建消费者组: xgroup create ${STREAM_KEY} bambooGroup 0 """
try:
rsp = rds.xinfo_groups(stream)
for item in rsp:
if group_name == item['name'].decode('utf8'):
print(f'xgroup:{group_name}已存在, 无需再次创建')
return
except Exception as msg:
pass

group = rds.xgroup_create(stream, group_name, '0-0', mkstream=True)
if not group:
raise Exception(f'创建消费者组:{group_name}异常')
return group


# 2. 消费者
def get_data(rds, stream_key, group_name):
""" xreadgroup group mqGroup consumerA count 1 streams mq > """
count = 0
while True:
try:
streams = {
stream_key: '>',
}
msgs = rds.xreadgroup(group_name, 'userA', streams, block=10)
if not msgs:
continue
for msg in msgs:
print(msg)
except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))
sleep(0.5)

# 3. 生产者
def send_data(rds, stream_key):
count = 0
while count < MAX_MESSAGES:
try:
data = {
"producer": 'xgroup',
"some_id": uuid4().hex, # Just some random data
"count": count,
}
resp = rds.xadd(stream_key, data)
print(f'推送消息:{resp}')
count += 1

except ConnectionError as e:
print("ERROR REDIS CONNECTION: {}".format(e))
sleep(0.5)

参考