设计一个消息队列系统
问题描述
电商系统需要实现订单异步处理:用户下单后立即返回,后台异步处理库存扣减、优惠券核销、积分发放等操作。
需要设计一个可靠的消息队列架构。
传统思路
- 搜索博客文章 - 质量参差不齐,很多过时
- 看官方文档 - 信息量大,不知道从哪开始
- 参考开源项目 - 别人的场景未必适合
- 凭经验设计 - 可能遗漏关键点
预计耗时:1-2天调研 + 设计
AI思维
第一步:明确需求
我需要设计一个消息队列架构,用于电商订单异步处理。
## 业务场景
用户下单后,需要异步执行:
1. 库存扣减
2. 优惠券核销
3. 积分发放
4. 发送通知(短信/推送)
## 要求
- 消息不能丢失
- 支持失败重试
- 可以监控消息处理状态
- 日订单量 10 万级别
## 技术栈
- Java Spring Boot
- MySQL
- 已有 Redis
请帮我设计整体架构,并给出技术选型建议。第二步:AI 架构分析
AI 回复:
基于你的需求,我推荐以下架构:
技术选型建议
选项 优势 劣势 推荐度 RabbitMQ 成熟稳定,功能完善 运维成本 ⭐⭐⭐⭐⭐ Redis Stream 已有 Redis,无新组件 功能有限 ⭐⭐⭐⭐ Kafka 吞吐量高 对你的量级过重 ⭐⭐⭐ 本地消息表 简单可靠 需要轮询 ⭐⭐⭐⭐ 推荐:RabbitMQ(10万级订单最合适)
如果不想引入新组件:本地消息表 + Redis 调度
第三步:深入设计
我选择 RabbitMQ。请给我详细的架构设计,包括:
1. Exchange 和 Queue 的设计
2. 消息格式
3. 消费者设计
4. 失败处理机制
5. 监控方案AI 给出详细设计:
## 1. Exchange 和 Queue 设计
┌─────────────────────────────────────────────────────────┐
│ order.exchange │
│ (Topic Exchange) │
└─────────────┬───────────────┬───────────────┬───────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ inventory │ │ coupon │ │ points │
│ .queue │ │ .queue │ │ .queue │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
(死信队列) (死信队列) (死信队列)
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ inventory │ │ coupon │ │ points │
│ .dlq │ │ .dlq │ │ .dlq │
└─────────────┘ └─────────────┘ └─────────────┘java
## 2. 消息格式
@Data
public class OrderMessage {
private String messageId; // 幂等性保证
private String orderId;
private Long userId;
private BigDecimal amount;
private String couponId;
private Integer pointsToAdd;
private LocalDateTime createTime;
private Integer retryCount; // 重试次数
}java
## 3. 生产者设计
@Service
public class OrderMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void sendOrderCreatedMessage(Order order) {
OrderMessage message = buildMessage(order);
// 先保存到本地消息表(保证消息不丢)
messageRepository.save(message);
// 发送到 MQ
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
message,
msg -> {
msg.getMessageProperties().setMessageId(message.getMessageId());
return msg;
}
);
}
}java
## 4. 消费者设计
@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {
@RabbitHandler
public void handleMessage(OrderMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 幂等性检查
if (messageRepository.isProcessed(message.getMessageId())) {
channel.basicAck(tag, false);
return;
}
// 业务处理
inventoryService.deduct(message.getOrderId());
// 标记已处理
messageRepository.markProcessed(message.getMessageId());
// 确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
handleFailure(message, channel, tag, e);
}
}
private void handleFailure(OrderMessage message, Channel channel,
long tag, Exception e) {
if (message.getRetryCount() < 3) {
// 重试:拒绝并重新入队
channel.basicNack(tag, false, true);
} else {
// 超过重试次数,进入死信队列
channel.basicNack(tag, false, false);
alertService.alert("消息处理失败", message, e);
}
}
}yaml
## 5. RabbitMQ 配置
spring:
rabbitmq:
host: localhost
port: 5672
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 10 # 预取数量
retry:
enabled: true
max-attempts: 3
initial-interval: 1000第四步:补充关键点
设计看起来不错。请补充:
1. 如何保证消息不丢失?
2. 如何处理消息重复?
3. 如何监控消息积压?AI 补充:
消息不丢失的三重保障:
- 生产者确认
javarabbitTemplate.setConfirmCallback((data, ack, cause) -> { if (!ack) { // 发送失败,从本地消息表重发 retryFromLocalMessage(data.getId()); } });
- 消息持久化
java@Bean public Queue inventoryQueue() { return QueueBuilder.durable("inventory.queue") .withArgument("x-dead-letter-exchange", "dlx.exchange") .build(); }
- 消费者手动确认
javachannel.basicAck(tag, false); // 处理成功才确认消息重复处理(幂等性):
java// 方案1:数据库唯一约束 INSERT INTO processed_messages (message_id) VALUES (?) ON DUPLICATE KEY UPDATE processed_at = NOW(); // 方案2:Redis 原子操作 Boolean isNew = redisTemplate.opsForValue() .setIfAbsent("msg:" + messageId, "1", 24, TimeUnit.HOURS); if (!isNew) return; // 已处理监控方案:
java// Prometheus metrics @Scheduled(fixedRate = 60000) public void collectMetrics() { QueueInfo info = rabbitAdmin.getQueueInfo("inventory.queue"); gauge.set(info.getMessageCount()); if (info.getMessageCount() > 10000) { alertService.alert("消息积压告警"); } }
结果对比
| 指标 | 传统方式 | AI思维 |
|---|---|---|
| 调研时间 | 1-2天 | 1小时 |
| 设计完整度 | 可能遗漏 | 系统全面 |
| 最佳实践 | 需要踩坑学习 | 直接获得 |
| 代码质量 | 边写边改 | 一次到位 |
关键洞察
- 先明确需求:告诉 AI 你的业务场景和约束条件
- 要求完整方案:不只是架构图,还要配置、代码、监控
- 追问细节:关键点(不丢失、不重复)要单独深入
- 验证可行性:AI 的方案要结合实际情况调整
设计模板
我需要设计 [什么系统]。
## 业务场景
[具体描述业务需求]
## 约束条件
- 性能要求:[QPS/延迟]
- 可靠性要求:[不丢失/不重复/最终一致]
- 运维要求:[是否可引入新组件]
## 现有技术栈
[列出现有的技术组件]
请给我:
1. 技术选型分析
2. 整体架构设计
3. 关键代码示例
4. 运维监控方案