跳转至
用户认证 - eMac Tech Docs

用户认证

重要提醒

为了您的权益以及学习体验,请不要外借账号!

如果您没有账号或忘记密码,请联系管理员重置密码。

请输入您的登录信息

Java操作Redis Stream

环境配置

1. 依赖配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2. 配置类

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        // 设置key的序列化方式
        template.setKeySerializer(new StringRedisSerializer());
        // 设置value的序列化方式
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        return template;
    }
}

基础操作

1. 消息生产

@Service
public class StreamProducer {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String produceMessage(String stream, Map<String, String> message) {
        // 使用自动生成的ID
        return redisTemplate.opsForStream()
            .add(StreamOperations.StreamAddOptions.empty(), stream, message);

        // 使用自定义ID
        // return redisTemplate.opsForStream()
        //     .add(StreamOperations.StreamAddOptions.empty()
        //         .withId("custom-id"), stream, message);
    }

    public List<String> produceBatchMessages(String stream, List<Map<String, String>> messages) {
        return redisTemplate.opsForStream()
            .add(StreamOperations.StreamAddOptions.empty(), stream, messages);
    }
}

2. 消息消费

@Service
public class StreamConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public List<MapRecord<String, Object, Object>> consumeMessages(String stream, String group, String consumer) {
        return redisTemplate.opsForStream()
            .read(Consumer.from(group, consumer),
                StreamReadOptions.empty(),
                StreamOffset.create(stream, ReadOffset.lastConsumed()));
    }

    public void acknowledgeMessage(String stream, String group, String messageId) {
        redisTemplate.opsForStream()
            .acknowledge(group, stream, messageId);
    }
}

高级特性实现

1. 消费者组管理

@Service
public class StreamGroupManager {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public void createGroup(String stream, String group) {
        try {
            redisTemplate.opsForStream()
                .createGroup(stream, group);
        } catch (Exception e) {
            // 组已存在,忽略异常
        }
    }

    public void deleteGroup(String stream, String group) {
        redisTemplate.opsForStream()
            .deleteGroup(stream, group);
    }

    public StreamInfo.XInfoGroup getGroupInfo(String stream, String group) {
        return redisTemplate.opsForStream()
            .info(stream)
            .getGroups()
            .stream()
            .filter(g -> g.groupName().equals(group))
            .findFirst()
            .orElse(null);
    }
}

2. 死信队列处理

@Service
public class DeadLetterQueueHandler {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String DEAD_LETTER_QUEUE = "dead-letter-queue";

    public void handleDeadLetter(String stream, String group, String messageId) {
        // 获取消息内容
        Map<Object, Object> message = redisTemplate.opsForStream()
            .range(stream, Range.from(Range.Bound.inclusive(messageId))
                .to(Range.Bound.inclusive(messageId)))
            .stream()
            .findFirst()
            .map(MapRecord::getValue)
            .orElse(null);

        if (message != null) {
            // 将消息发送到死信队列
            redisTemplate.opsForStream()
                .add(StreamOperations.StreamAddOptions.empty(),
                    DEAD_LETTER_QUEUE,
                    message);

            // 确认原消息
            redisTemplate.opsForStream()
                .acknowledge(group, stream, messageId);
        }
    }
}

3. 消息重试机制

@Service
public class MessageRetryHandler {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final int MAX_RETRY_COUNT = 3;

    public void retryMessage(String stream, String group, String messageId) {
        // 获取消息重试次数
        String retryKey = "retry:" + messageId;
        Integer retryCount = (Integer) redisTemplate.opsForValue()
            .get(retryKey);

        if (retryCount == null) {
            retryCount = 0;
        }

        if (retryCount < MAX_RETRY_COUNT) {
            // 增加重试次数
            redisTemplate.opsForValue()
                .increment(retryKey);

            // 重新投递消息
            Map<Object, Object> message = redisTemplate.opsForStream()
                .range(stream, Range.from(Range.Bound.inclusive(messageId))
                    .to(Range.Bound.inclusive(messageId)))
                .stream()
                .findFirst()
                .map(MapRecord::getValue)
                .orElse(null);

            if (message != null) {
                redisTemplate.opsForStream()
                    .add(StreamOperations.StreamAddOptions.empty(),
                        stream,
                        message);
            }
        } else {
            // 超过最大重试次数,进入死信队列
            handleDeadLetter(stream, group, messageId);
        }
    }
}

实际应用示例

1. 订单处理系统

@Service
public class OrderProcessor {
    @Autowired
    private StreamProducer producer;
    @Autowired
    private StreamConsumer consumer;

    private static final String ORDER_STREAM = "order-stream";
    private static final String ORDER_GROUP = "order-group";

    public void processOrder(Order order) {
        // 发送订单消息
        Map<String, String> message = new HashMap<>();
        message.put("orderId", order.getId());
        message.put("amount", order.getAmount().toString());
        message.put("status", "PENDING");

        producer.produceMessage(ORDER_STREAM, message);
    }

    @Scheduled(fixedRate = 1000)
    public void consumeOrders() {
        List<MapRecord<String, Object, Object>> messages = consumer
            .consumeMessages(ORDER_STREAM, ORDER_GROUP, "order-processor");

        for (MapRecord<String, Object, Object> message : messages) {
            try {
                // 处理订单
                processOrderMessage(message);
                // 确认消息
                consumer.acknowledgeMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
            } catch (Exception e) {
                // 处理失败,进入重试
                retryMessage(ORDER_STREAM, ORDER_GROUP, message.getId());
            }
        }
    }
}

2. 日志收集系统

@Service
public class LogCollector {
    @Autowired
    private StreamProducer producer;

    private static final String LOG_STREAM = "log-stream";

    public void collectLog(LogEntry log) {
        Map<String, String> message = new HashMap<>();
        message.put("timestamp", log.getTimestamp());
        message.put("level", log.getLevel());
        message.put("message", log.getMessage());
        message.put("source", log.getSource());

        producer.produceMessage(LOG_STREAM, message);
    }

    public void collectBatchLogs(List<LogEntry> logs) {
        List<Map<String, String>> messages = logs.stream()
            .map(log -> {
                Map<String, String> message = new HashMap<>();
                message.put("timestamp", log.getTimestamp());
                message.put("level", log.getLevel());
                message.put("message", log.getMessage());
                message.put("source", log.getSource());
                return message;
            })
            .collect(Collectors.toList());

        producer.produceBatchMessages(LOG_STREAM, messages);
    }
}

最佳实践

1. 性能优化

  • 使用批量操作
  • 合理设置消费者数量
  • 使用连接池
  • 定期清理过期数据

2. 可靠性保证

  • 实现消息重试
  • 处理死信消息
  • 监控消息积压
  • 备份重要消息

3. 运维建议

  • 监控Stream长度
  • 设置告警阈值
  • 定期维护消费者组
  • 清理过期数据