Java操作Redis Stream
环境配置
1. 依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
2. 配置类
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 设置key的序列化方式
template.setKeySerializer(new StringRedisSerializer());
// 设置value的序列化方式
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
基础操作
1. 消息生产
@Service
public class StreamProducer {
@Autowired
private StringRedisTemplate redisTemplate;
public String produceMessage(String stream, Map<String, String> message) {
// 使用自动生成的ID
return redisTemplate.opsForStream()
.add(StreamOperations.StreamAddOptions.empty(), stream, message);
// 使用自定义ID
// return redisTemplate.opsForStream()
// .add(StreamOperations.StreamAddOptions.empty()
// .withId("custom-id"), stream, message);
}
public List<String> produceBatchMessages(String stream, List<Map<String, String>> messages) {
return redisTemplate.opsForStream()
.add(StreamOperations.StreamAddOptions.empty(), stream, messages);
}
}
2. 消息消费
@Service
public class StreamConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
public List<MapRecord<String, Object, Object>> consumeMessages(String stream, String group, String consumer) {
return redisTemplate.opsForStream()
.read(Consumer.from(group, consumer),
StreamReadOptions.empty(),
StreamOffset.create(stream, ReadOffset.lastConsumed()));
}
public void acknowledgeMessage(String stream, String group, String messageId) {
redisTemplate.opsForStream()
.acknowledge(group, stream, messageId);
}
}
高级特性实现
1. 消费者组管理
@Service
public class StreamGroupManager {
@Autowired
private StringRedisTemplate redisTemplate;
public void createGroup(String stream, String group) {
try {
redisTemplate.opsForStream()
.createGroup(stream, group);
} catch (Exception e) {
// 组已存在,忽略异常
}
}
public void deleteGroup(String stream, String group) {
redisTemplate.opsForStream()
.deleteGroup(stream, group);
}
public StreamInfo.XInfoGroup getGroupInfo(String stream, String group) {
return redisTemplate.opsForStream()
.info(stream)
.getGroups()
.stream()
.filter(g -> g.groupName().equals(group))
.findFirst()
.orElse(null);
}
}
2. 死信队列处理
@Service
public class DeadLetterQueueHandler {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String DEAD_LETTER_QUEUE = "dead-letter-queue";
public void handleDeadLetter(String stream, String group, String messageId) {
// 获取消息内容
Map<Object, Object> message = redisTemplate.opsForStream()
.range(stream, Range.from(Range.Bound.inclusive(messageId))
.to(Range.Bound.inclusive(messageId)))
.stream()
.findFirst()
.map(MapRecord::getValue)
.orElse(null);
if (message != null) {
// 将消息发送到死信队列
redisTemplate.opsForStream()
.add(StreamOperations.StreamAddOptions.empty(),
DEAD_LETTER_QUEUE,
message);
// 确认原消息
redisTemplate.opsForStream()
.acknowledge(group, stream, messageId);
}
}
}
3. 消息重试机制
@Service
public class MessageRetryHandler {
@Autowired
private StringRedisTemplate redisTemplate;
private static final int MAX_RETRY_COUNT = 3;
public void retryMessage(String stream, String group, String messageId) {
// 获取消息重试次数
String retryKey = "retry:" + messageId;
Integer retryCount = (Integer) redisTemplate.opsForValue()
.get(retryKey);
if (retryCount == null) {
retryCount = 0;
}
if (retryCount < MAX_RETRY_COUNT) {
// 增加重试次数
redisTemplate.opsForValue()
.increment(retryKey);
// 重新投递消息
Map<Object, Object> message = redisTemplate.opsForStream()
.range(stream, Range.from(Range.Bound.inclusive(messageId))
.to(Range.Bound.inclusive(messageId)))
.stream()
.findFirst()
.map(MapRecord::getValue)
.orElse(null);
if (message != null) {
redisTemplate.opsForStream()
.add(StreamOperations.StreamAddOptions.empty(),
stream,
message);
}
} else {
// 超过最大重试次数,进入死信队列
handleDeadLetter(stream, group, messageId);
}
}
}
实际应用示例
1. 订单处理系统
@Service
public class OrderProcessor {
@Autowired
private StreamProducer producer;
@Autowired
private StreamConsumer consumer;
private static final String ORDER_STREAM = "order-stream";
private static final String ORDER_GROUP = "order-group";
public void processOrder(Order order) {
// 发送订单消息
Map<String, String> message = new HashMap<>();
message.put("orderId", order.getId());
message.put("amount", order.getAmount().toString());
message.put("status", "PENDING");
producer.produceMessage(ORDER_STREAM, message);
}
@Scheduled(fixedRate = 1000)
public void consumeOrders() {
List<MapRecord<String, Object, Object>> messages = consumer
.consumeMessages(ORDER_STREAM, ORDER_GROUP, "order-processor");
for (MapRecord<String, Object, Object> message : messages) {
try {
// 处理订单
processOrderMessage(message);
// 确认消息
consumer.acknowledgeMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
} catch (Exception e) {
// 处理失败,进入重试
retryMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
}
}
}
}
2. 日志收集系统
@Service
public class LogCollector {
@Autowired
private StreamProducer producer;
private static final String LOG_STREAM = "log-stream";
public void collectLog(LogEntry log) {
Map<String, String> message = new HashMap<>();
message.put("timestamp", log.getTimestamp());
message.put("level", log.getLevel());
message.put("message", log.getMessage());
message.put("source", log.getSource());
producer.produceMessage(LOG_STREAM, message);
}
public void collectBatchLogs(List<LogEntry> logs) {
List<Map<String, String>> messages = logs.stream()
.map(log -> {
Map<String, String> message = new HashMap<>();
message.put("timestamp", log.getTimestamp());
message.put("level", log.getLevel());
message.put("message", log.getMessage());
message.put("source", log.getSource());
return message;
})
.collect(Collectors.toList());
producer.produceBatchMessages(LOG_STREAM, messages);
}
}
最佳实践
1. 性能优化
- 使用批量操作
- 合理设置消费者数量
- 使用连接池
- 定期清理过期数据
2. 可靠性保证
- 实现消息重试
- 处理死信消息
- 监控消息积压
- 备份重要消息
3. 运维建议
- 监控Stream长度
- 设置告警阈值
- 定期维护消费者组
- 清理过期数据