主题
RocketMQ
是什么
RocketMQ 是一个分布式消息中间件,最初由阿里巴巴开发,并于2016年捐赠给了Apache软件基金会。
它设计用于高吞吐量、低延迟的消息传递场景,适用于大规模分布式系统中的异步通信、事件驱动架构和解耦服务之间的交互。
特点
- 高吞吐量:能够处理每秒数以万计的消息,适合需要处理大量数据的场景。
- 低延迟:优化了消息传输路径,确保消息可以快速被消费,满足对实时性要求较高的应用需求。
- 持久化支持:提供消息持久化存储机制,即使在服务器崩溃的情况下也能保证消息不丢失。
- 高可用性:通过主从复制和自动故障转移机制来保障系统的高可用性。
- 灵活的消息模型:支持多种消息模型,包括点对点(Queue)、发布/订阅(Topic)等。
- 丰富的消息类型:除了普通消息外,还支持顺序消息、事务消息、定时消息等多种类型。
- 强大的扩展能力:易于水平扩展,可以根据业务增长情况动态调整集群规模。
如何使用
1. 引入依赖
如果你正在使用 Maven 构建工具,可以在 pom.xml
文件中添加 RocketMQ 的客户端依赖:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version> <!-- 或者选择其他合适的版本 -->
</dependency>
2. 安装并启动 RocketMQ
首先,你需要下载 RocketMQ 并进行安装。可以从 Apache RocketMQ Releases 页面获取最新版本。下载后解压并按照官方文档指引启动 Name Server 和 Broker。
启动 Name Server:
bash
nohup sh bin/mqnamesrv &
启动 Broker:
bash
nohup sh bin/mqbroker -n localhost:9876 &
3. 发送消息
创建生产者发送消息到指定的主题(Topic):
java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 实例化生产者组名
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息实例,指定主题、标签和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
producer.send(msg);
}
// 关闭生产者实例
producer.shutdown();
}
}
4. 接收消息
创建消费者监听特定主题的消息:
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
通过上述步骤,就可以建立起基本的 RocketMQ 生产者和消费者的例子,从而实现消息的发送与接收。
RocketMQ 提供的强大功能使得它可以适应各种复杂的业务场景,无论是简单的消息传递还是复杂的企业级应用集成。