Redis Stream实现
Redis Stream概述
1. 什么是Redis Stream
graph TD
A[获取当前时间戳] --> B[获取序列号]
B --> C[组合ID]
C --> D{自定义ID?}
D -->|是| E[使用自定义ID]
D -->|否| F[使用自动生成ID]
Redis Stream是Redis 5.0引入的一个新的数据类型,它是一个基于追加模式的消息队列实现。
2. Stream的特点
- 消息持久化
- 消息ID自增
- 消费者组支持
- 消息确认机制
- 历史消息查询
Stream数据结构
1. 消息结构
graph TD
A[Stream消息] --> B[消息ID]
B --> C[字段1]
C --> D[字段2]
D --> E[字段N]
2. 消费者组结构
graph TD
A[消费者组] --> B[组名]
B --> C[最后投递ID]
C --> D[消费者列表]
D --> E[待处理消息]
Stream基本操作
1. 消息生产
# 添加消息
XADD mystream * sensor-id 1234 temperature 19.8 humidity 80
# 批量添加
XADD mystream * sensor-id 1234 temperature 19.8 humidity 80
XADD mystream * sensor-id 1234 temperature 20.1 humidity 81
2. 消息消费
# 读取消息
XREAD COUNT 2 STREAMS mystream 0
# 消费者组消费
XGROUP CREATE mystream mygroup 0
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
3. 消息确认
# 确认消息
XACK mystream mygroup 1526569495631-0
# 查看待确认消息
XPENDING mystream mygroup
Stream高级特性
1. 消息ID生成
graph TD
A[获取当前时间戳] --> B[获取序列号]
B --> C[组合ID]
C --> D{自定义ID?}
D -->|是| E[使用自定义ID]
D -->|否| F[使用自动生成ID]
2. 消费者组管理
graph TD
subgraph 消费者组管理
create[创建组] --> delete[删除组]
delete --> setid[设置ID]
setid --> destroy[销毁组]
end
3. 消息处理流程
graph TD
A[生产者发送消息] --> B[消息进入Stream]
B --> C[消费者组分配消息]
C --> D[消费者处理消息]
D --> E{处理成功?}
E -->|是| F[确认消息]
E -->|否| G[进入待处理队列]
G --> H[重试处理]
Stream应用场景
1. 日志收集
graph TD
subgraph 日志收集
app[应用日志] --> Stream
sys[系统日志] --> Stream
sec[安全日志] --> Stream
end
2. 事件追踪
graph TD
subgraph 事件追踪
user[用户行为] --> Stream
system[系统事件] --> Stream
business[业务事件] --> Stream
end
3. 消息通知
graph TD
subgraph 消息通知
email[邮件通知] --> Stream
sms[短信通知] --> Stream
push[推送通知] --> Stream
end
最佳实践
1. 性能优化
- 使用批量操作
- 合理设置消费者数量
- 及时确认消息
- 定期清理过期消息
2. 可靠性保证
- 实现消息重试
- 处理死信消息
- 监控消息积压
- 备份重要消息
3. 运维建议
- 监控Stream长度
- 设置告警阈值
- 定期维护消费者组
- 清理过期数据