站内链接:

Stream

说明

基于 redis 的消息队列有好几种方式:

  1. 基于 list 队列的PUSH + POP, 这其中有好几种实现方式, 具体见redis 消息队列说明
  2. 基于 Sorted Set 的实现
  3. 订阅和发布模式
  4. Stream 实现

其中 Stream 是 redis5.0 新增的支持类型, 其支持消息队列大部分的功能:

  • 消息 ID 的序列化生成, 格式为: 时间戳-序号
  • 消息的遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 消息队列监控

在演示各个 Stream 命令之前先简单的介绍下 Stream 消息 ID 生成方式和格式:

  • 格式: 1666168208279-0, 时间戳 + 序号, 其中序号表示相同时间戳时(毫秒时间点内)多个消息记录而产生的序列化序号, 这两者都是 64 位整型
  • 相同时间戳多序号都是递增有序的, 一般在 multi 事务中执行时产生

基本命令

下面就从 redis 命令角度简单的演示下 Stream 类型操作, 首先时最基本的 CURD 操作.

  1. 创建和增加消息
1
2
3
4
5
6
7
# 1. 添加一条stream流数据, 其中消息内容使用如下是: key1 value1 key2 value2
# 注意, 这里ID用*代替, 表示由redis自动生成消息ID
XADD memberMessage * user bifeng msg Hello
XADD memberMessage * user xiaoyuan msg Beautiful

# 2. 记录id信息
XADD mystream1 * user_id 9000 venue_id 123 star_rating 3

好了, 上面我们往memberMessage消息队列中添加了两条消息, 那么我们怎么查看消息呢?

  1. 查看消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 1. 查看某一个消息队列中所有消息, 其会打印消息队列中每一个消息的ID以及信息, 这里0表示从第一条消息开始
xread streams memberMessage 0-0

# 2. 查看从0开始的前两条数据
xread count 2 streams memberMessage 0-0

# 3. 阻塞读取数据, 若无数据则阻塞等待直到超时, 默认不阻塞.
# 若ID为0, 队列中有数据则立刻返回
XREAD block 5000 streams memberMessage 0-0
XREAD count 1 block 5000 streams memberMessage 0-0
# 若ID为0, 队列中无数据则阻塞等待超时
XREAD block 5000 streams memberMessageEmpty 0-0
# 阻塞等待某个不存在的ID, 注意这个ID值必须比已有的ID大(时间戳-序号), 否则默认返回所有的已有数据
XREAD block 5000 streams memberMessage 1766173246980

# 阻塞等待获取最新的数据, 这里使用$, 这是阻塞等待最常用的方式
XREAD block 1000 streams memberMessage $
# 注意, 可以存在多个XREAD阻塞同一个消息队列, 一旦有消息, 则多个阻塞都会返回新推入的数据

# 4. 遍历获取所有的数据xrange(类似长度, redis中range命令格式均为: 前缀 + range), 这里-表示最小值, +表示最大值
xrange memberMessage - +
xrange memberMessage - + count 2

# 5. 反向查看消息, 注意+, -的位置变换了
XREVRANGE memberMessage + - count 2
  1. 删除和裁剪消息
1
2
3
4
5
6
7
8
9
# 1. 删除某一个指定ID的记录, 删除成功后再查看消息队列会发现少了一条该ID的记录
xdel memberMessage 1666168208279-3

# 2. 查看队列目前的长度(redis中的长度命令一般是: [前缀]+len)
xlen memberMessage

# 3. 对指定流进行修剪, 指定队列最大长度, 一旦指定就会发现之前消息队列中前面的记录丢失仅仅保留最后最大长度数据
# 注意, 这个最大长度仅仅对已有数据做删减, 后续仍然可以往消息队列中添加更多的数据
XTRIM memberMessage MAXLEN 2
  1. 队列简要信息
1
2
3
4
5
6
7
8
# 1. 获取指定队列或者消费组中的摘要和统计信息
xinfo stream mq

# 2. 获取消费组的摘要信息, mq表示队列名
xinfo groups mq

# 3. 获取消费组成员信息, 其中mqGroup表示消费组名字
xinfo consumers mq mqGroup

上面的命令输入如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
---->  第一条命令
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1666173318957-4"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1666173318957-0"
2) 1) "msg"
2) "1"
13) "last-entry"
14) 1) "1666173318957-4"
2) 1) "msg"
2) "5"

----> 第二条命令

1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 1
7) "last-delivered-id"
8) "1666173318957-0"

消费者组

上面已经简单介绍过消费组的摘要信息查看命令, 那么消费组提出的目的是什么呢? 默认情况下,一个队列中的消息可以被多个人同时消费, 但若存在如下需求:

  • a. 多个消费者协作消费同一个消息队列 M
  • b. 例如消息队列中有 9 条消息, 消费者 A 仅仅消费 1, 4, 7, 消费者 C 仅仅消费 3, 6, 9, 这样可以提高消费能力(即分流)

消费者组模式就是因此产生, 其提供了可以进行消费者组管理和操作的各种命令:

  • 消费者组实现同组多个消费者并行但不重复消费消息的能力, 提升消费
  • 消费者组能够记住最新消费的信息, 保证消息连续消费, 每一个 xgroup 都有一个last-delivered-id指向消息链表的某个元素, 类似散列链路
  • 消费者组能够记住消息转移次数, 实现消费失败重试以及永久性故障的消息转移
  • 消费者组提供了 PEL 未确认列表和 ACK 确认机制, 确保消息不丢失

下面就是消费者组的一些命令实战说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 1. 首先, 批量事务的往消息队列中push5条消息
multi
xadd mq * msg 1
xadd mq * msg 2
xadd mq * msg 3
xadd mq * msg 4
xadd mq * msg 5
exec

# 查看队列中的信息
xrange mq - +

# 2. 创建一个消费组, 其中0表示从第一条开始消费, 可以指定ID来设置起始消费消息
xgroup create mq mqGroup 0

# 3. 指定消费组A消费一条数据, 每次执行, 都会在消费组中记录该用户A已经消费的偏移量ID, 再次调用会返回下一条
# 其中>表示未被组内消费的起始消息
xreadgroup group mqGroup consumerA count 1 streams mq >

# 4. 在消费者A已经消费第一条信息之后, 该组中消费者B再次调用仍然返回"下一条消息", 其通过last_delivered_id来记录
xreadgroup group mqGroup consumerB count 1 streams mq >

由上可知, 同一个消费者组中的多个用户消费信息时存在互斥原则, 每次消费都更新last_delivered_id. 在使用 xreadgroup 从流中获取数据之后, 此时并未进行数据的 ACK 确认, 此时待处理条目中会记录这些被消费但是未ACK的数据信息, 通过命令: xpending mq mqGroup可以查看待处理条目信息:

1
2
3
4
5
6
# 这里表示由两条2消息未进行确认, 其中consumerA有两条信息未确认
1) (integer) 2
2) "1666257044906-0"
3) "1666257044906-2"
4) 1) 1) "consumerA"
2) "2"

其中书说明如下:

  • 1)表示行数
  • 2), 3)表示待处理条目中的最小和最大 ID
  • 4)中存储每一个用户至少一条待处理消息信息. 此时我们通过xack命令确认某一个用户的某一个消息记录, 则会发现XPENDING输出发生变化.

注意, xpending 返回的实际就是为进行 ACK 的消息信息, 该命令在异常场景中有这非常重要的意义.

1
2
3
4
5
6
7
8
9
# 1. 设置ack
xack mq mqGroup 1666257044906-2
xpending mq mqGroup

# 2. 查看xpending中只要信息: 消息ID, 消费者, 消费之后到现在的毫秒数, 消息传递次数
xpending mq mqGroup - + 10

# 3. 指定消费者
xpending mq mqGroup - + 10 consumerB

未 ack 消息

  1. 获取当前消息队列中未进行 ACK 的数量: xinfo consumers mq mqGroup, 根据输出中的 pending 的值判断未进行 ACK 的数量, 根据 min 和 max 获取最小和最大 ID 值
1
2
3
4
5
6
1) 1) "name"
2) "drcc"
3) "pending"
4) (integer) 67
5) "idle"
6) (integer) 2649798
  1. 调用 xrange 获取指定范围的数据: xrange stream1 minid1 maxid2 count 1000获取所有未确认的信息, 这种方式有点取消, 可能会对内存造成负担

  2. 重新处理每一条信息, 并对消息进行再次 ACK 确认, 当然如果再次发生异常就不会进行 ACK 确认.

list 队列

这里仅仅介绍一些不常见的列表命令, 比如阻塞相关命令, 介绍流程仍然同上面, 从最简单的 CURD 开始介绍列表命令

  1. 阻塞推入和弹出, 非阻塞命令这里就不再描述了, 阻塞命令可以避免空转现象, 减少网络请求, 每次命令执行都是往 redis 服务发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
# 原始数据, 返回当前list长度
lpush lmq 1 2 3 4 5 6

# 1. 阻塞弹出: BLPOP--这是LPOP的阻塞版本, BRPOP--这是LPOP阻塞版本
# -- 弹出值: 6
blpop lmq 1
# -- 弹出值: 1
rlpop lmq 2
# -- 不存在的队列, 阻塞直到超时, 注意, 这里超时单位为S
blpop lmq2 2

# 2. 同时阻塞多个key, 返回第一个非空数据, 0表示无限阻塞
blpop lmq lmq2 lmq3 0
  1. 原子性的返回并移除 Source 列表最后一个元素, 并把获取的元素存储到 DESTINATION 的头部
1
2
3
4
5
6
7
8
# 1. 先创建source
RPUSH mylist "one" "two" "three"

# 2. 弹出并推入destination
RPOPLPUSH mylist destlist

# 3. 查看dest: 发现three值
LRANGE destlist 0 -1

该命令的阻塞版本是BRPOPLPUSH, 阻塞命令类似RLPOP

  1. 插入, 索引, 移除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 1. 类似Python的下标索引操作, 注意, 下标是从0开始算
lrange lmq 0 -1
# --- 第3个
lindex lmq 2
# --- 最后一个
lindex lmq -1

# 2. 在某个基准值pivot的前面或者后面插入新的值
linsert mylist before "two" "two-before"
linsert mylist after "two" "two-after"

# 3. 移除队列中指定值的元素
# --- 移除前面3个值为two的元素
lrem mylist 3 "two"
# --- 移除后面3个值为two的元素
lrem mylist -3 "two"
# --- 移除队列中所有值为two的元素
lrem mylist 0 "two"
  1. 更新指定下标值, 切片操作
1
2
3
4
5
# 1. 更新指定下标值
lset mylist 1 "new-second"

# 2. 类似python语法, 对列表进行切片[0:2], 返回一个新的列表
ltrim mylist 0 2

有序集合

有序集合, 相比于 set 结构, 每个元素都有 score 分数来表示其在集合中的顺序, 有序集合也是作为 redis 消息队列的一种实现方式.

  1. 添加, 删除, 查看元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# a. 增加元素同时指定分数
zadd myset 1.1 '1.1score' 2 '2score'
# --- 输出value, 注意查询输出也是按照score从小到大输出的
zrange myset 0 -1
# --- 输出value, score信息
zrange myset 0 -1 WITHSCORES

# b. 获取集合中的个数, 注意不是ZLEN而是ZCARD
zcard myset
# --- 统计指定score值的集合个数, ZCARD的高级用法?
zcount myset -inf +inf
zcount myset 1 3

# 获取某个元素在队列总的排名
zrank myset "1.1score"
# 获取某个元素的分数
zscore myset "1.1score"

# c. 交互式命令: zinter, zinterstore

# d. 删除指定成员
zrem myset "1.1score"
  1. 更新
1
2
3
4
5
# a. 给指定的member的score值加上增量值, 其中增量值可以为正数, 也可以是负数
zincrby myset 8 "1.1score"

# b. 若member不存在, 则效果类似zadd
zincrby myset 3 "unexistkey"
  1. 获取区间元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# a. zrange根据下标返回指定区间内的元素, 排序: 按照score从低到高
zrange myset 0 -1
zrange myset 0 -1 withscores

# b. zrangebyscore根据score返回有序集合中确定score范围的元素
zrangebyscore myset 3 9
# 其中(表示开区间(小于或者大于), 默认情况下是小于等于或者大于等于
zrangebyscore myset (3 9

# c. zrevrange, 逆序
zrevrange myset 0 -1
zrevrange myset 0 -1 withscores

# d. zrevrangebyscore根据score逆序返回, 其用于消息延时队列
zrevrangebyscore myset 3 9

shell 命令

批量删除

我们可以通过 redis-cli 来批量的执行在redis command line上的命令, 其中频繁使用的就是批量删除, 使用简单的正则来批量的删除一些通用规则的减值. 在使用 redis-cli 执行命令的时候, 需要确保每一条命令的执行都有正确的访问权限.

例如, 批量删除以shenyuan开头的所有 redis 键值, 并且指定数据库, 指定密码, 指定 host:

1
2
3
4
# 使用keys匹配
redis-cli -h redis.example.com -p 6379 -a passwd -n 4 keys "shenyuan:*" | xargs redis-cli -h redis.example.com -p 6379 -a passwd -n 4 del
# 使用scan扫描命令
redis-cli -h redis.example.com -p 6379 -a passwd -n 4 --scan --pattern 'shenyuan:*' | xargs redis-cli -h redis.example.com -p 6379 -a passwd -n 4 DEL

注意, 请谨慎执行此类命令.

配置

Config

  1. SET

在 Redis 中,使用 CONFIG SET 命令可以动态地修改 Redis 的配置参数,其语法:CONFIG SET parameter value,其中,parameter是要修改的配置参数的名称,value是要设置的新值,在 redis 2.0 之后,一旦执行该命令,则配置就会立即生效。

配置文件

上面的 CONFIG 命令可以在 Redis 实例运行的时候修改某些配置,但该命令不支持所有的配置项,例如rename-command,此时就需要更改 redis.conf 配置文件并重启才能生效。

参考