Skip to content

设计一个消息队列系统

问题描述

电商系统需要实现订单异步处理:用户下单后立即返回,后台异步处理库存扣减、优惠券核销、积分发放等操作。

需要设计一个可靠的消息队列架构。

传统思路

  1. 搜索博客文章 - 质量参差不齐,很多过时
  2. 看官方文档 - 信息量大,不知道从哪开始
  3. 参考开源项目 - 别人的场景未必适合
  4. 凭经验设计 - 可能遗漏关键点

预计耗时:1-2天调研 + 设计

AI思维

第一步:明确需求

我需要设计一个消息队列架构,用于电商订单异步处理。

## 业务场景
用户下单后,需要异步执行:
1. 库存扣减
2. 优惠券核销
3. 积分发放
4. 发送通知(短信/推送)

## 要求
- 消息不能丢失
- 支持失败重试
- 可以监控消息处理状态
- 日订单量 10 万级别

## 技术栈
- Java Spring Boot
- MySQL
- 已有 Redis

请帮我设计整体架构,并给出技术选型建议。

第二步:AI 架构分析

AI 回复:

基于你的需求,我推荐以下架构:

技术选型建议

选项优势劣势推荐度
RabbitMQ成熟稳定,功能完善运维成本⭐⭐⭐⭐⭐
Redis Stream已有 Redis,无新组件功能有限⭐⭐⭐⭐
Kafka吞吐量高对你的量级过重⭐⭐⭐
本地消息表简单可靠需要轮询⭐⭐⭐⭐

推荐:RabbitMQ(10万级订单最合适)

如果不想引入新组件:本地消息表 + Redis 调度

第三步:深入设计

我选择 RabbitMQ。请给我详细的架构设计,包括:
1. Exchange 和 Queue 的设计
2. 消息格式
3. 消费者设计
4. 失败处理机制
5. 监控方案

AI 给出详细设计:

## 1. Exchange 和 Queue 设计

┌─────────────────────────────────────────────────────────┐
│                    order.exchange                        │
│                   (Topic Exchange)                       │
└─────────────┬───────────────┬───────────────┬───────────┘
              │               │               │
              ▼               ▼               ▼
    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
    │ inventory   │  │  coupon     │  │  points     │
    │   .queue    │  │   .queue    │  │   .queue    │
    └─────────────┘  └─────────────┘  └─────────────┘
              │               │               │
              ▼               ▼               ▼
         (死信队列)      (死信队列)      (死信队列)
    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
    │ inventory   │  │  coupon     │  │  points     │
    │   .dlq      │  │   .dlq      │  │   .dlq      │
    └─────────────┘  └─────────────┘  └─────────────┘
java
## 2. 消息格式

@Data
public class OrderMessage {
    private String messageId;      // 幂等性保证
    private String orderId;
    private Long userId;
    private BigDecimal amount;
    private String couponId;
    private Integer pointsToAdd;
    private LocalDateTime createTime;
    private Integer retryCount;    // 重试次数
}
java
## 3. 生产者设计

@Service
public class OrderMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Transactional
    public void sendOrderCreatedMessage(Order order) {
        OrderMessage message = buildMessage(order);

        // 先保存到本地消息表(保证消息不丢)
        messageRepository.save(message);

        // 发送到 MQ
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.created",
            message,
            msg -> {
                msg.getMessageProperties().setMessageId(message.getMessageId());
                return msg;
            }
        );
    }
}
java
## 4. 消费者设计

@Component
@RabbitListener(queues = "inventory.queue")
public class InventoryConsumer {

    @RabbitHandler
    public void handleMessage(OrderMessage message, Channel channel,
                              @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            // 幂等性检查
            if (messageRepository.isProcessed(message.getMessageId())) {
                channel.basicAck(tag, false);
                return;
            }

            // 业务处理
            inventoryService.deduct(message.getOrderId());

            // 标记已处理
            messageRepository.markProcessed(message.getMessageId());

            // 确认消息
            channel.basicAck(tag, false);

        } catch (Exception e) {
            handleFailure(message, channel, tag, e);
        }
    }

    private void handleFailure(OrderMessage message, Channel channel,
                               long tag, Exception e) {
        if (message.getRetryCount() < 3) {
            // 重试:拒绝并重新入队
            channel.basicNack(tag, false, true);
        } else {
            // 超过重试次数,进入死信队列
            channel.basicNack(tag, false, false);
            alertService.alert("消息处理失败", message, e);
        }
    }
}
yaml
## 5. RabbitMQ 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认
        prefetch: 10              # 预取数量
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

第四步:补充关键点

设计看起来不错。请补充:
1. 如何保证消息不丢失?
2. 如何处理消息重复?
3. 如何监控消息积压?

AI 补充:

消息不丢失的三重保障:

  1. 生产者确认
java
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
    if (!ack) {
        // 发送失败,从本地消息表重发
        retryFromLocalMessage(data.getId());
    }
});
  1. 消息持久化
java
@Bean
public Queue inventoryQueue() {
    return QueueBuilder.durable("inventory.queue")
        .withArgument("x-dead-letter-exchange", "dlx.exchange")
        .build();
}
  1. 消费者手动确认
java
channel.basicAck(tag, false);  // 处理成功才确认

消息重复处理(幂等性):

java
// 方案1:数据库唯一约束
INSERT INTO processed_messages (message_id) VALUES (?)
ON DUPLICATE KEY UPDATE processed_at = NOW();

// 方案2:Redis 原子操作
Boolean isNew = redisTemplate.opsForValue()
    .setIfAbsent("msg:" + messageId, "1", 24, TimeUnit.HOURS);
if (!isNew) return; // 已处理

监控方案:

java
// Prometheus metrics
@Scheduled(fixedRate = 60000)
public void collectMetrics() {
    QueueInfo info = rabbitAdmin.getQueueInfo("inventory.queue");
    gauge.set(info.getMessageCount());

    if (info.getMessageCount() > 10000) {
        alertService.alert("消息积压告警");
    }
}

结果对比

指标传统方式AI思维
调研时间1-2天1小时
设计完整度可能遗漏系统全面
最佳实践需要踩坑学习直接获得
代码质量边写边改一次到位

关键洞察

  1. 先明确需求:告诉 AI 你的业务场景和约束条件
  2. 要求完整方案:不只是架构图,还要配置、代码、监控
  3. 追问细节:关键点(不丢失、不重复)要单独深入
  4. 验证可行性:AI 的方案要结合实际情况调整

设计模板

我需要设计 [什么系统]。

## 业务场景
[具体描述业务需求]

## 约束条件
- 性能要求:[QPS/延迟]
- 可靠性要求:[不丢失/不重复/最终一致]
- 运维要求:[是否可引入新组件]

## 现有技术栈
[列出现有的技术组件]

请给我:
1. 技术选型分析
2. 整体架构设计
3. 关键代码示例
4. 运维监控方案

Released under the MIT License.