Java操作Redis Stream
环境配置
1. 依赖配置
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
 
2. 配置类
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        // 设置key的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
        // 设置value的序列化方式
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}
 
基础操作
1. 消息生产
@Service
public class StreamProducer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public String produceMessage(String stream, Map<String, String> message) {
        // 使用自动生成的ID
        return redisTemplate.opsForStream()
            .add(StreamOperations.StreamAddOptions.empty(), stream, message);
        // 使用自定义ID
        // return redisTemplate.opsForStream()
        //     .add(StreamOperations.StreamAddOptions.empty()
        //         .withId("custom-id"), stream, message);
    }
    public List<String> produceBatchMessages(String stream, List<Map<String, String>> messages) {
        return redisTemplate.opsForStream()
            .add(StreamOperations.StreamAddOptions.empty(), stream, messages);
    }
}
 
2. 消息消费
@Service
public class StreamConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public List<MapRecord<String, Object, Object>> consumeMessages(String stream, String group, String consumer) {
        return redisTemplate.opsForStream()
            .read(Consumer.from(group, consumer),
                StreamReadOptions.empty(),
                StreamOffset.create(stream, ReadOffset.lastConsumed()));
    }
    public void acknowledgeMessage(String stream, String group, String messageId) {
        redisTemplate.opsForStream()
            .acknowledge(group, stream, messageId);
    }
}
 
高级特性实现
1. 消费者组管理
@Service
public class StreamGroupManager {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void createGroup(String stream, String group) {
        try {
            redisTemplate.opsForStream()
                .createGroup(stream, group);
        } catch (Exception e) {
            // 组已存在,忽略异常
        }
    }
    public void deleteGroup(String stream, String group) {
        redisTemplate.opsForStream()
            .deleteGroup(stream, group);
    }
    public StreamInfo.XInfoGroup getGroupInfo(String stream, String group) {
        return redisTemplate.opsForStream()
            .info(stream)
            .getGroups()
            .stream()
            .filter(g -> g.groupName().equals(group))
            .findFirst()
            .orElse(null);
    }
}
 
2. 死信队列处理
@Service
public class DeadLetterQueueHandler {
    @Autowired
    private StringRedisTemplate redisTemplate;
    private static final String DEAD_LETTER_QUEUE = "dead-letter-queue";
    public void handleDeadLetter(String stream, String group, String messageId) {
        // 获取消息内容
        Map<Object, Object> message = redisTemplate.opsForStream()
            .range(stream, Range.from(Range.Bound.inclusive(messageId))
                .to(Range.Bound.inclusive(messageId)))
            .stream()
            .findFirst()
            .map(MapRecord::getValue)
            .orElse(null);
        if (message != null) {
            // 将消息发送到死信队列
            redisTemplate.opsForStream()
                .add(StreamOperations.StreamAddOptions.empty(),
                    DEAD_LETTER_QUEUE,
                    message);
            // 确认原消息
            redisTemplate.opsForStream()
                .acknowledge(group, stream, messageId);
        }
    }
}
 
3. 消息重试机制
@Service
public class MessageRetryHandler {
    @Autowired
    private StringRedisTemplate redisTemplate;
    private static final int MAX_RETRY_COUNT = 3;
    public void retryMessage(String stream, String group, String messageId) {
        // 获取消息重试次数
        String retryKey = "retry:" + messageId;
        Integer retryCount = (Integer) redisTemplate.opsForValue()
            .get(retryKey);
        if (retryCount == null) {
            retryCount = 0;
        }
        if (retryCount < MAX_RETRY_COUNT) {
            // 增加重试次数
            redisTemplate.opsForValue()
                .increment(retryKey);
            // 重新投递消息
            Map<Object, Object> message = redisTemplate.opsForStream()
                .range(stream, Range.from(Range.Bound.inclusive(messageId))
                    .to(Range.Bound.inclusive(messageId)))
                .stream()
                .findFirst()
                .map(MapRecord::getValue)
                .orElse(null);
            if (message != null) {
                redisTemplate.opsForStream()
                    .add(StreamOperations.StreamAddOptions.empty(),
                        stream,
                        message);
            }
        } else {
            // 超过最大重试次数,进入死信队列
            handleDeadLetter(stream, group, messageId);
        }
    }
}
 
实际应用示例
1. 订单处理系统
@Service
public class OrderProcessor {
    @Autowired
    private StreamProducer producer;
    @Autowired
    private StreamConsumer consumer;
    private static final String ORDER_STREAM = "order-stream";
    private static final String ORDER_GROUP = "order-group";
    public void processOrder(Order order) {
        // 发送订单消息
        Map<String, String> message = new HashMap<>();
        message.put("orderId", order.getId());
        message.put("amount", order.getAmount().toString());
        message.put("status", "PENDING");
        producer.produceMessage(ORDER_STREAM, message);
    }
    @Scheduled(fixedRate = 1000)
    public void consumeOrders() {
        List<MapRecord<String, Object, Object>> messages = consumer
            .consumeMessages(ORDER_STREAM, ORDER_GROUP, "order-processor");
        for (MapRecord<String, Object, Object> message : messages) {
            try {
                // 处理订单
                processOrderMessage(message);
                // 确认消息
                consumer.acknowledgeMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
            } catch (Exception e) {
                // 处理失败,进入重试
                retryMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
            }
        }
    }
}
 
2. 日志收集系统
@Service
public class LogCollector {
    @Autowired
    private StreamProducer producer;
    private static final String LOG_STREAM = "log-stream";
    public void collectLog(LogEntry log) {
        Map<String, String> message = new HashMap<>();
        message.put("timestamp", log.getTimestamp());
        message.put("level", log.getLevel());
        message.put("message", log.getMessage());
        message.put("source", log.getSource());
        producer.produceMessage(LOG_STREAM, message);
    }
    public void collectBatchLogs(List<LogEntry> logs) {
        List<Map<String, String>> messages = logs.stream()
            .map(log -> {
                Map<String, String> message = new HashMap<>();
                message.put("timestamp", log.getTimestamp());
                message.put("level", log.getLevel());
                message.put("message", log.getMessage());
                message.put("source", log.getSource());
                return message;
            })
            .collect(Collectors.toList());
        producer.produceBatchMessages(LOG_STREAM, messages);
    }
}
 
最佳实践
1. 性能优化
- 使用批量操作
 
- 合理设置消费者数量
 
- 使用连接池
 
- 定期清理过期数据
 
2. 可靠性保证
- 实现消息重试
 
- 处理死信消息
 
- 监控消息积压
 
- 备份重要消息
 
3. 运维建议
- 监控Stream长度
 
- 设置告警阈值
 
- 定期维护消费者组
 
- 清理过期数据