Skip to content

RocketMQ

是什么

RocketMQ 是一个分布式消息中间件,最初由阿里巴巴开发,并于2016年捐赠给了Apache软件基金会。

它设计用于高吞吐量、低延迟的消息传递场景,适用于大规模分布式系统中的异步通信、事件驱动架构和解耦服务之间的交互。

特点

  • 高吞吐量:能够处理每秒数以万计的消息,适合需要处理大量数据的场景。
  • 低延迟:优化了消息传输路径,确保消息可以快速被消费,满足对实时性要求较高的应用需求。
  • 持久化支持:提供消息持久化存储机制,即使在服务器崩溃的情况下也能保证消息不丢失。
  • 高可用性:通过主从复制和自动故障转移机制来保障系统的高可用性。
  • 灵活的消息模型:支持多种消息模型,包括点对点(Queue)、发布/订阅(Topic)等。
  • 丰富的消息类型:除了普通消息外,还支持顺序消息、事务消息、定时消息等多种类型。
  • 强大的扩展能力:易于水平扩展,可以根据业务增长情况动态调整集群规模。

如何使用

1. 引入依赖

如果你正在使用 Maven 构建工具,可以在 pom.xml 文件中添加 RocketMQ 的客户端依赖:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.3</version> <!-- 或者选择其他合适的版本 -->
</dependency>

2. 安装并启动 RocketMQ

首先,你需要下载 RocketMQ 并进行安装。可以从 Apache RocketMQ Releases 页面获取最新版本。下载后解压并按照官方文档指引启动 Name Server 和 Broker。

启动 Name Server:

bash
nohup sh bin/mqnamesrv &

启动 Broker:

bash
nohup sh bin/mqbroker -n localhost:9876 &

3. 发送消息

创建生产者发送消息到指定的主题(Topic):

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 实例化生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息实例,指定主题、标签和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            producer.send(msg);
        }
        // 关闭生产者实例
        producer.shutdown();
    }
}

4. 接收消息

创建消费者监听特定主题的消息:

java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

通过上述步骤,就可以建立起基本的 RocketMQ 生产者和消费者的例子,从而实现消息的发送与接收。

RocketMQ 提供的强大功能使得它可以适应各种复杂的业务场景,无论是简单的消息传递还是复杂的企业级应用集成。