Skip to content

消息队列方案

在高并发场景下,消息队列(Message Queue)可以作为一种有效的技术手段来优化系统性能和处理能力。

应用场景

  1. 异步处理

    • 当一个请求或事件发生时,不直接执行耗时的操作(如数据库写入、复杂计算等),而是将任务封装成消息投递到消息队列中。
    • 消费者线程从队列中取出消息进行处理,这样主线程无需等待这些操作完成就可以快速响应,从而提高系统的吞吐量和响应速度。
  2. 解耦服务(业务解耦)

    • 消息队列作为不同服务之间的通信媒介,能够降低服务间的耦合度。
    • 生产者只需要将消息发送到队列中,消费者负责处理这些消息,两者互不影响,各自扩展和维护也更灵活。
  3. 流量削峰

    • 在高峰期,消息队列能起到缓冲的作用,防止大量请求瞬间涌入后端系统造成系统崩溃。
    • 当请求过多时,可以把请求放入队列排队处理,通过控制消费速率来实现系统负载均衡。
  4. 优先级调度

    • 许多消息队列支持消息优先级设置,可以根据业务需求对不同优先级的消息进行差异化处理,确保重要且紧急的任务优先得到执行。
  5. 数据一致性

    • 使用事务消息或分布式事务方案(例如两阶段提交或最终一致性方案),确保在分布式环境下,即使出现网络故障或其他异常情况,也能保证数据的一致性。
  6. 弹性伸缩

    • 根据队列中的消息堆积情况动态增减消费者实例,实现资源的有效利用。
    • 当消息积压时增加消费者数量加快处理速度;当消息减少时减少消费者以节省资源。
  7. 故障隔离与恢复

    • 消息队列的存在使得生产者和消费者之间具有一定的隔离性,即使消费者服务出现故障,也不会直接影响生产者的正常运行。
    • 一旦消费者服务恢复正常,可以从队列中继续消费消息,实现故障后的自动恢复
  8. 日志处理与分析

    • 高并发场景下的日志处理可以通过消息队列异步收集并分发给日志分析系统,避免同步处理带来的压力。

特性

业务无关:只做消息分发

FIFO:先投递先到达

容灾:节点的动态增删和消息的持久化

性能:吞吐量提升,系统内部通信效率提高

常用中间件

特性RocketMQRabbitMQKafka
开发语言JavaErlangScala、Java
协议支持MQTT、HTTP、TCPAMQP、STOMP、MQTT自定义协议(基于TCP)
消息顺序支持严格的消息顺序保证支持消息顺序保证(有序队列)分区内消息顺序保证
消息持久化支持消息持久化到磁盘支持消息持久化到磁盘支持消息持久化到磁盘
可靠性提供高可靠性的消息传输提供高可靠性的消息传输提供高可靠性的消息传输
可用性提供高可用性的消息存储和传输提供高可用性的消息存储和传输提供高可用性的消息存储和传输
性能适用于大规模消息流和高吞吐量的场景适用于低延迟和高吞吐量的场景适用于高吞吐量和低延迟的场景
扩展性支持集群部署和水平扩展支持集群部署和水平扩展支持分布式、水平扩展和动态伸缩
社区活跃度Apache RocketMQ 社区活跃度较高RabbitMQ 社区活跃度较高Apache Kafka 社区活跃度较高
适用场景金融、电商、大数据等领域即时通讯、消息推送等领域实时日志处理、流式处理等领域

RocketMQ

官网 5.x文档 Quick Start

核心组件:

  1. Namesrv(Name Server):命名服务,负责管理整个 RocketMQ 集群的元数据信息,包括 Topic、Producer、Consumer 等信息的注册和路由。

  2. Broker:消息存储和消息传输的核心组件,负责存储消息数据和提供消息的读写服务。

  3. Producer:消息生产者,负责将消息发送到 Broker。

  4. Consumer:消息消费者,负责从 Broker 拉取消息并进行消费。

工作原理:

  1. Producer 发送消息到 RocketMQ 集群中的一个 Broker,Broker 将消息存储在主题(Topic)的队列中。

  2. Consumer 从指定的主题(Topic)的队列中拉取消息,进行消费。

  3. Namesrv 管理整个集群的元数据信息,包括 Topic、Producer、Consumer 等信息的注册和路由,以及负载均衡等功能。

示例

添加依赖

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.consumer.group=my-group

消息生产者和消费者

java
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RocketMQProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String message) {
        rocketMQTemplate.send(topic, MessageBuilder.withPayload(message).build());
    }
}

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-group")
public class RocketMQConsumer {

    @org.apache.rocketmq.spring.annotation.MessageListener(topic = "my-topic", consumerGroup = "my-group")
    public void handleMessage(@Payload String message, @Headers MessageHeaders headers) {
        System.out.println("Received message: " + message);
    }
}

发送消息

java
@Autowired
private RocketMQProducer rocketMQProducer;

public void sendMessage() {
    rocketMQProducer.sendMessage("my-topic", "Hello, RocketMQ!");
}

RabbitMQ

官网 文档

核心组件:

  1. Exchange:交换机,负责接收生产者发送的消息,并根据路由键(Routing Key)将消息路由到队列(Queue)中。

  2. Queue:队列,用于存储消息,消费者从队列中获取消息进行消费。

  3. Binding:绑定,用于绑定 Exchange 和 Queue,规定消息从 Exchange 到达 Queue 的路由规则。

  4. Connection:连接,生产者和消费者与 RabbitMQ 服务器之间的连接。

  5. Channel:信道,通过连接创建的虚拟连接,用于进行消息的传输和交互。

工作原理:

  1. Producer 将消息发送到 Exchange,Exchange 根据绑定的路由规则将消息路由到指定的 Queue。

  2. Consumer 从指定的 Queue 中获取消息,进行消费。

  3. RabbitMQ 通过 Exchange、Queue 和 Binding 来实现消息的路由和传递,提供了灵活的消息路由和分发机制。

示例

java
@Data
public class Message {
    private Long id;
    private String msg;
    private Date sendTime;
}
java
public interface QueueConstants {

    String TEST = "test";

    String MESSAGE = "message";
}
java
@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue queue() {
        return new Queue(QueueConstants.TEST);
    }
}
java
@Slf4j
@Configuration
public class RabbitMqServer {

    @RabbitListener(queues = QueueConstants.TEST)
    public void receive(String message) {
        log.info("{}", message);
    }
}
java
@Slf4j
@Configuration
public class RabbitMqClient {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend(QueueConstants.TEST,message);
    }
}

发送消息

java
rabbitMqClient.send(message);

maven依赖

xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.4.RELEASE</version>
</dependency>

Kafka

官网 文档

核心组件:

  1. Producer:消息生产者,负责将消息发送到 Kafka 集群中的 Broker。

  2. Broker:Kafka 集群中的消息存储和消息传输的核心组件,负责存储消息数据和提供消息的读写服务。

  3. Consumer:消息消费者,负责从 Kafka 集群中的 Broker 拉取消息并进行消费。

  4. Topic:主题,消息的逻辑分类单元,每个主题包含多个分区(Partition)。

  5. Partition:分区,每个主题可以分为多个分区,每个分区是一个有序的消息队列,分区中的消息按顺序存储。

工作原理:

  1. Producer 将消息发送到指定的 Topic 中,Kafka 将消息追加到该 Topic 对应的分区中。

  2. Consumer 从指定的分区中拉取消息,并进行消费。

  3. Kafka 的分区机制保证了消息的顺序性和可伸缩性,通过分布式的消费者组实现了高吞吐量和高可用性。

示例

java
public interface TopicConstants {

    String TEST = "test";

    String MESSAGE = "message";
}
java
@Component
@Slf4j
public class KafkaReceiver {

    @KafkaListener(topics = {TopicConstants.TEST})
    public void receive(ConsumerRecord<?, ?> record) {
        log.info("record:{}", record);
    }
}
java
@Slf4j
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    public void send(String msg) {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(msg);
        message.setSendTime(new Date());
        log.info("send message:{}", message);
        kafkaTemplate.send(TopicConstants.TEST, gson.toJson(message));
    }
}

发送消息

java
kafkaSender.send(message);

maven依赖

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.5.RELEASE</version>
</dependency>