RocketMQ集群搭建完全指南:从零开始构建高可用消息中间件
前言
RocketMQ是阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用、低延迟的特点。本文将详细介绍如何从零开始搭建一套生产级的RocketMQ集群,包括集群架构、安装部署、配置优化和监控运维。
RocketMQ集群架构
核心组件
RocketMQ集群主要由以下组件组成:
- NameServer:服务注册中心,负责Broker的路由管理
- Broker:消息存储服务器,负责消息的接收、存储和投递
- Producer:消息生产者,发送消息到Broker
- Consumer:消息消费者,从Broker拉取消息
集群模式
- 单Master模式:仅一个Master,风险较高,仅用于测试
- 多Master模式:多个Master,无Slave,吞吐量高但无备份
- 多Master多Slave模式(异步):Master有Slave,异步复制,性能好
- 多Master多Slave模式(同步):Master有Slave,同步复制,数据可靠性高
生产环境推荐架构
生产环境推荐使用双Master + 双Slave(同步)架构:
NameServer集群:
├── NameServer1 (192.168.1.10:9876)
├── NameServer2 (192.168.1.11:9876)
└── NameServer3 (192.168.1.12:9876)
Broker集群(双主双从同步):
├── BrokerA-Master (192.168.1.20:10911)
├── BrokerA-Slave (192.168.1.21:10911)
├── BrokerB-Master (192.168.1.22:10911)
└── BrokerB-Slave (192.168.1.23:10911)
环境准备
系统要求
- 操作系统:CentOS 7+ 或 Ubuntu 18.04+
- Java版本:JDK 1.8+
- 内存:建议每台机器8GB+
- 磁盘:建议SSD,每台机器500GB+
防火墙配置
# CentOS 7
firewall-cmd --permanent --add-port=9876/tcp # NameServer端口
firewall-cmd --permanent --add-port=10911/tcp # Broker默认端口
firewall-cmd --permanent --add-port=10909/tcp # Broker HA端口
firewall-cmd --reload
安装部署
1. 安装Java环境
# 安装OpenJDK 8
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 验证安装
java -version
# 设置JAVA_HOME
echo 'export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk' >> ~/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc
2. 下载RocketMQ
# 下载RocketMQ 5.x
cd /opt
wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
# 解压
unzip rocketmq-all-5.3.0-bin-release.zip
mv rocketmq-all-5.3.0-bin-release rocketmq
# 创建软链接
ln -s /opt/rocketmq /opt/rocketmq5
3. 调整JVM参数
RocketMQ默认JVM参数较大,生产环境需要调整:
# 编辑runbroker.sh(Broker启动脚本)
vim /opt/rocketmq/bin/runbroker.sh
# 找到JAVA_OPT配置,根据机器内存调整
JAVA_OPT="${JAVA_OPT} -Xms4g -Xmx4g -Xmn2g"
# 编辑runserver.sh(NameServer启动脚本)
vim /opt/rocketmq/bin/runserver.sh
# 找到JAVA_OPT配置
JAVA_OPT="${JAVA_OPT} -Xms1g -Xmx1g"
部署NameServer集群
在所有NameServer节点执行
# 启动NameServer
cd /opt/rocketmq
nohup sh bin/mqnamesrv &
# 验证启动
tail -f ~/logs/rocketmqlogs/namesrv.log
# 检查端口
netstat -anp | grep 9876
设置开机自启
# 创建systemd服务文件
cat > /etc/systemd/system/rocketmq-namesrv.service << 'EOF'
[Unit]
Description=RocketMQ NameServer
After=network.target
[Service]
Type=forking
User=root
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk"
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/bin/kill -15 $MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 启用服务
systemctl daemon-reload
systemctl enable rocketmq-namesrv
systemctl start rocketmq-namesrv
部署Broker集群
创建数据目录
# 创建存储目录
mkdir -p /data/rocketmq/store
mkdir -p /data/rocketmq/commitlog
mkdir -p /data/rocketmq/consumequeue
mkdir -p /data/rocketmq/index
BrokerA-Master 配置
# 创建配置文件
cat > /opt/rocketmq/conf/broker-a-master.properties << 'EOF'
# 集群名称
clusterName = DefaultCluster
# Broker名称
brokerName = broker-a
brokerId = 0
# NameServer地址(多个用分号分隔)
namesrvAddr = 192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 监听端口
listenPort = 10911
haListenPort = 10909
# 存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 刷盘策略:ASYNC_FLUSH(异步刷盘) 或 SYNC_FLUSH(同步刷盘)
flushDiskType = SYNC_FLUSH
# 主从复制模式:ASYNC_MASTER(异步) 或 SYNC_MASTER(同步)
brokerRole = SYNC_MASTER
# 是否允许Broker自动创建Topic
autoCreateTopicEnable = true
# 是否允许Broker自动创建订阅组
autoCreateSubscriptionGroup = true
# 消息最大大小(默认4M)
maxMessageSize = 4194304
# 发消息线程池数量
sendThreadPoolQueueCapacity = 10000
# 拉消息线程池数量
pullThreadPoolQueueCapacity = 100000
EOF
BrokerA-Slave 配置
cat > /opt/rocketmq/conf/broker-a-slave.properties << 'EOF'
# 集群名称
clusterName = DefaultCluster
# Broker名称(与Master相同)
brokerName = broker-a
brokerId = 1 # Slave的brokerId必须大于0
# NameServer地址
namesrvAddr = 192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 监听端口
listenPort = 10921
haListenPort = 10919
# 存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 刷盘策略
flushDiskType = SYNC_FLUSH
# 主从复制模式:SLAVE
brokerRole = SLAVE
# 是否允许Broker自动创建Topic
autoCreateTopicEnable = true
# 是否允许Broker自动创建订阅组
autoCreateSubscriptionGroup = true
EOF
BrokerB-Master 配置
cat > /opt/rocketmq/conf/broker-b-master.properties << 'EOF'
# 集群名称
clusterName = DefaultCluster
# Broker名称
brokerName = broker-b
brokerId = 0
# NameServer地址
namesrvAddr = 192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 监听端口
listenPort = 10911
haListenPort = 10909
# 存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 刷盘策略
flushDiskType = SYNC_FLUSH
# 主从复制模式
brokerRole = SYNC_MASTER
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
EOF
BrokerB-Slave 配置
cat > /opt/rocketmq/conf/broker-b-slave.properties << 'EOF'
# 集群名称
clusterName = DefaultCluster
# Broker名称
brokerName = broker-b
brokerId = 1
# NameServer地址
namesrvAddr = 192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876
# 监听端口
listenPort = 10921
haListenPort = 10919
# 存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 刷盘策略
flushDiskType = SYNC_FLUSH
# 主从复制模式
brokerRole = SLAVE
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
EOF
启动Broker
# BrokerA-Master
nohup sh bin/mqbroker -c /opt/rocketmq/conf/broker-a-master.properties &
# BrokerA-Slave
nohup sh bin/mqbroker -c /opt/rocketmq/conf/broker-a-slave.properties &
# BrokerB-Master
nohup sh bin/mqbroker -c /opt/rocketmq/conf/broker-b-master.properties &
# BrokerB-Slave
nohup sh bin/mqbroker -c /opt/rocketmq/conf/broker-b-slave.properties &
设置Broker开机自启
# 创建systemd服务文件(以broker-a-master为例)
cat > /etc/systemd/system/rocketmq-broker.service << 'EOF'
[Unit]
Description=RocketMQ Broker
After=network.target rocketmq-namesrv.service
[Service]
Type=forking
User=root
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk"
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-a-master.properties
ExecStop=/bin/kill -15 $MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 启用服务
systemctl daemon-reload
systemctl enable rocketmq-broker
systemctl start rocketmq-broker
集群验证
1. 查看集群状态
# 进入RocketMQ目录
cd /opt/rocketmq
# 查看集群状态
sh bin/mqadmin clusterList
# 输出示例:
# #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
# DefaultCluster broker-a 0 192.168.1.20:10911 V5_3_0 0.00(0,0ms) 0.00(0,0ms) 0 0 0.00
# DefaultCluster broker-a 1 192.168.1.21:10921 V5_3_0 0.00(0,0ms) 0.00(0,0ms) 0 0 0.00
# DefaultCluster broker-b 0 192.168.1.22:10911 V5_3_0 0.00(0,0ms) 0.00(0,0ms) 0 0 0.00
# DefaultCluster broker-b 1 192.168.1.23:10921 V5_3_0 0.00(0,0ms) 0.00(0,0ms) 0 0 0.00
2. 查看Topic列表
# 查看Topic列表
sh bin/mqadmin topicList -n 192.168.1.10:9876
# 查看Topic详情
sh bin/mqadmin topicStatus -n 192.168.1.10:9876 -t DefaultCluster
3. 测试发送消息
# 设置环境变量
export NAMESRV_ADDR='192.168.1.10:9876;192.168.1.11:9876'
# 启动生产者测试
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 启动消费者测试
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Docker部署RocketMQ集群
使用Docker Compose快速部署RocketMQ集群:
version: '3.8'
services:
# NameServer集群
namesrv1:
image: apache/rocketmq:5.3.0
container_name: rmqnamesrv1
ports:
- "9876:9876"
environment:
- JAVA_OPT_EXT=-Xms512M -Xmx512M
command: ["sh","-c","./mqnamesrv"]
networks:
- rocketmq
namesrv2:
image: apache/rocketmq:5.3.0
container_name: rmqnamesrv2
ports:
- "9877:9876"
environment:
- JAVA_OPT_EXT=-Xms512M -Xmx512M
command: ["sh","-c","./mqnamesrv"]
networks:
- rocketmq
# BrokerA-Master
broker-a-master:
image: apache/rocketmq:5.3.0
container_name: rmqbroker-a-master
ports:
- "10911:10911"
- "10909:10909"
environment:
- NAMESRV_ADDR=rmqnamesrv1:9876;rmqnamesrv2:9876
- JAVA_OPT_EXT=-Xms1G -Xmx1G
volumes:
- ./data/broker-a-master/logs:/home/rocketmq/logs
- ./data/broker-a-master/store:/home/rocketmq/store
- ./conf/broker-a-master.conf:/home/rocketmq/conf/broker-a-master.conf
command: ["sh","-c","./mqbroker -c /home/rocketmq/conf/broker-a-master.conf"]
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
# BrokerA-Slave
broker-a-slave:
image: apache/rocketmq:5.3.0
container_name: rmqbroker-a-slave
ports:
- "10921:10921"
- "10919:10919"
environment:
- NAMESRV_ADDR=rmqnamesrv1:9876;rmqnamesrv2:9876
- JAVA_OPT_EXT=-Xms1G -Xmx1G
volumes:
- ./data/broker-a-slave/logs:/home/rocketmq/logs
- ./data/broker-a-slave/store:/home/rocketmq/store
- ./conf/broker-a-slave.conf:/home/rocketmq/conf/broker-a-slave.conf
command: ["sh","-c","./mqbroker -c /home/rocketmq/conf/broker-a-slave.conf"]
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
# BrokerB-Master
broker-b-master:
image: apache/rocketmq:5.3.0
container_name: rmqbroker-b-master
ports:
- "10931:10911"
- "10929:10909"
environment:
- NAMESRV_ADDR=rmqnamesrv1:9876;rmqnamesrv2:9876
- JAVA_OPT_EXT=-Xms1G -Xmx1G
volumes:
- ./data/broker-b-master/logs:/home/rocketmq/logs
- ./data/broker-b-master/store:/home/rocketmq/store
- ./conf/broker-b-master.conf:/home/rocketmq/conf/broker-b-master.conf
command: ["sh","-c","./mqbroker -c /home/rocketmq/conf/broker-b-master.conf"]
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
# BrokerB-Slave
broker-b-slave:
image: apache/rocketmq:5.3.0
container_name: rmqbroker-b-slave
ports:
- "10941:10921"
- "10939:10919"
environment:
- NAMESRV_ADDR=rmqnamesrv1:9876;rmqnamesrv2:9876
- JAVA_OPT_EXT=-Xms1G -Xmx1G
volumes:
- ./data/broker-b-slave/logs:/home/rocketmq/logs
- ./data/broker-b-slave/store:/home/rocketmq/store
- ./conf/broker-b-slave.conf:/home/rocketmq/conf/broker-b-slave.conf
command: ["sh","-c","./mqbroker -c /home/rocketmq/conf/broker-b-slave.conf"]
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
# Console控制台
console:
image: styletang/rocketmq-console-ng:latest
container_name: rmqconsole
ports:
- "8080:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv1:9876;rmqnamesrv2:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq
networks:
rocketmq:
driver: bridge
Java客户端使用示例
1. Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
// 启动生产者
producer.start();
// 发送消息
for (int i = 0; i < 10; i++) {
Message message = new Message(
"test_topic", // Topic
"tag_a", // Tag
"order_" + i, // Key
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 同步发送
SendResult result = producer.send(message);
System.out.println("消息发送成功: " + result);
}
// 关闭生产者
producer.shutdown();
}
}
3. 消费者示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876;192.168.1.12:9876");
// 设置消费位置:从最新开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅Topic
consumer.subscribe("test_topic", "*");
// 设置消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context
) {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者启动成功");
}
}
4. 顺序消息示例
// 顺序消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("192.168.1.10:9876;192.168.1.11:9876");
consumer.subscribe("order_topic", "*");
// 使用MessageListenerOrderly保证顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeOrderlyContext context
) {
// 同一Queue的消息会按顺序被同一消费者线程消费
for (MessageExt msg : msgs) {
System.out.println("顺序消费: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
监控和运维
1. 使用RocketMQ Console
RocketMQ Console是官方提供的Web管理界面:
# 下载Console
cd /opt
wget https://github.com/apache/rocketmq-dashboard/releases/download/rocketmq-dashboard-1.0.0/rocketmq-dashboard-1.0.0.jar
# 启动Console
nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=192.168.1.10:9876;192.168.1.11:9876 &
# 访问地址
http://192.168.1.10:8080
2. 常用运维命令
# 查看集群信息
sh mqadmin clusterList -n 192.168.1.10:9876
# 查看Broker信息
sh mqadmin brokerStatus -n 192.168.1.10:9876 -b 192.168.1.20:10911
# 查看Topic列表
sh mqadmin topicList -n 192.168.1.10:9876
# 查看Topic状态
sh mqadmin topicStatus -n 192.168.1.10:9876 -t test_topic
# 创建Topic
sh mqadmin updateTopic -n 192.168.1.10:9876 -t new_topic -c DefaultCluster
# 删除Topic
sh mqadmin deleteTopic -n 192.168.1.10:9876 -t old_topic -c DefaultCluster
# 查看消费者组
sh mqadmin consumerList -n 192.168.1.10:9876 -g consumer_group
# 查看消费者进度
sh mqadmin consumerProgress -n 192.168.1.10:9876 -g consumer_group
# 查看消息轨迹
sh mqadmin queryMsgById -n 192.168.1.10:9876 -i MSG_ID
3. 常见问题处理
内存不足
# 调整JVM参数
vim /opt/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -Xms2g -Xmx2g -Xmn1g"
磁盘空间不足
# 设置文件保留时间(默认72小时)
vim /opt/rocketmq/conf/broker-a-master.properties
# 添加以下配置
fileReservedTime=48
# 设置磁盘使用率阈值
diskMaxUsedSpaceRatio=85
消息堆积
# 1. 增加消费者实例数量
# 2. 增加Topic的Queue数量
sh mqadmin updateTopic -n 192.168.1.10:9876 -t test_topic -r 8 -w 8
# 3. 查看消费者状态
sh mqadmin consumerProgress -n 192.168.1.10:9876 -g consumer_group
性能优化建议
1. JVM参数优化
# Broker JVM参数建议
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
JAVA_OPT="${JAVA_OPT} -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=200"
JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError"
JAVA_OPT="${JAVA_OPT} -XX:HeapDumpPath=/data/rocketmq/logs/heapdump.hprof"
2. 操作系统优化
# 增加文件句柄限制
cat >> /etc/security/limits.conf << 'EOF'
* soft nofile 65536
* hard nofile 65536
EOF
# 增加TCP连接队列
echo 'net.core.somaxconn = 32768' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_max_syn_backlog = 32768' >> /etc/sysctl.conf
# 应用配置
sysctl -p
3. 网络参数优化
# 调整TCP keepalive
echo 'net.ipv4.tcp_keepalive_time = 600' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_keepalive_probes = 3' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_keepalive_intvl = 15' >> /etc/sysctl.conf
# 启用TCP快速打开
echo 'net.ipv4.tcp_fastopen = 3' >> /etc/sysctl.conf
4. 存储优化
- 使用SSD磁盘存储CommitLog
- 将CommitLog和ConsumeQueue分离存储
- 使用独立的磁盘进行存储
- 配置RAID 10提升读写性能
高可用最佳实践
1. 跨机房部署
# 机房A:NameServer1, BrokerA-Master, BrokerB-Slave
# 机房B:NameServer2, BrokerA-Slave, BrokerB-Master
# 机房C:NameServer3
2. 定期备份
# 定期备份CommitLog
#!/bin/bash
BACKUP_DIR=/backup/rocketmq/$(date +%Y%m%d)
mkdir -p $BACKUP_DIR
cp -r /data/rocketmq/commitlog/* $BACKUP_DIR/
find /backup/rocketmq -mtime +7 -exec rm -rf {} \;
3. 监控告警
- 监控Broker存活状态
- 监控消息堆积量
- 监控磁盘使用率
- 监控消费者延迟
- 配置Prometheus + Grafana进行可视化
总结
本文详细介绍了RocketMQ集群的搭建过程,包括:
- 集群架构:双Master双Slave同步复制架构
- 环境准备:Java环境、防火墙配置
- 部署安装:NameServer、Broker的配置和启动
- Docker部署:使用Docker Compose快速搭建
- 客户端使用:生产者和消费者的完整示例
- 监控运维:Console管理和常用命令
- 性能优化:JVM、操作系统、网络和存储优化
RocketMQ作为阿里巴巴开源的消息中间件,已经广泛应用于各大互联网公司。通过合理配置和优化,可以构建出高可用、高性能的消息中间件集群,为微服务架构提供可靠的消息支撑。
希望本文能帮助您快速搭建生产级的RocketMQ集群!如有问题,欢迎交流讨论。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。







