一条消息什么时候由哪台机器产生的、发送的耗时、消息大小、发送状态、存储在哪个 Broker 上、什么时候存储的以及存储在哪台 Broker 上、什么时候消费的、消费状态等信息,这些信息即消息轨迹,用于追踪消息从诞生到被消费的整个生命周期。
这些信息对于业务同学排查定位有着重要的意义,发送和消费往往在不同的业务部门。有了消息轨迹后一条消息有没有发送,发送成功了没,有没有消费一目了然,降低了彼此的沟通成本。
1. Broker 设置
RocketMQ 的默认消息轨迹功能默认是关闭的,可以将 Broker 属性 traceTopicEnable 设置为 true 开启。消息轨迹默认存储在 RMQ_SYS_TRACE_TOPIC 的主题中,可以通过 msgTraceTopicName 修改。
属性 | 默认参数 |
---|---|
traceTopicEnable | false |
msgTraceTopicName | RMQ_SYS_TRACE_TOPIC |
2. 发送端使用
发送轨迹 API
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic){
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}
说明:enableMsgTrace 是否开启发送轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。
发送代码示例
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID111",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
说明:创建 DefaultMQProducer 时将 enableMsgTrace 设置为 true 开启发送消息轨迹。
3. 消费端使用
消费轨迹 API
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
说明:enableMsgTrace 是否开启消费轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。
消费代码示例
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
说明:创建 DefaultMQPushConsumer 将 enableMsgTrace 设置为 true 开启消费消息轨迹。
4. 消息轨迹效果
通过发送和消费一条消息,在 RocketMQ-Console 中看下消息轨迹的效果截图。
发送消息内容
SendResult [sendStatus=SEND_OK, msgId=A9FE1075810A18B4AAC24A40738B0000, offsetMsgId=A9FE107500002A9F0000000000002147, messageQueue=MessageQueue [topic=TopicTest, brokerName=liangyong, queueId=1], queueOffset=2]
消费消息内容
Receive New Messages: [MessageExt [brokerName=liangyong, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1600135337872, bornHost=/169.254.16.117:65532, storeTimestamp=1600135337883, storeHost=/169.254.16.117:10911, msgId=A9FE107500002A9F0000000000002147, commitLogOffset=8519, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=OrderID111, CONSUME_START_TIME=1600135337915, UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
消息轨迹展现
在 RocketMQ 控制台,可以通过 message key 或者 message id 检索消息内容,如下图:
点击 MESSAGE TRACE DETAIL 可以查看消息轨迹,如下图:
发送轨迹原理:在消息发送前与发送后收集指标信息,并将指标信息异步发送到轨迹主题。
消费轨迹原理:消费的消息轨迹有两部分,一部分在拉取消息后处理消息前收集指标异步发送都轨迹主题;另一部分处理消息后收集消息指标异步发送到轨迹主题。
消息轨迹类型有三种,Pub 指发送轨迹,SubBefore 指消费前轨迹,SubAfter 指消费后轨迹。
发送轨迹 Pub
名称 | 说明 |
---|---|
TraceType | Pub 表示发送轨迹 |
timeStamp | 存储时间 |
regionId | 机房可用区,默认为 DefaultRegion(目前没有使用) |
groupName | 生产者组 producerGroup |
topic | 主题名称 |
msgId | 消息 ID,由客户端生成 |
tags | 消息 tag |
keys | 消息 key |
storeHost | 消息存储 Broker IP |
bodyLength | 消息体大小 |
costTime | 发送消息耗时 |
msgType | 消息类型:普通消息(Normal_Msg)、事物半消息(Trans_Msg_Half)、 事物提交消息(Trans_msg_Commit)、延迟消息(Delay_Msg) |
offsetMsgId | 消息 Id,由 Broker 生成 |
isSuccess | 发送是否成功,true 表示成功、false 表示失败 |
消费前轨迹 SubBefore
名称 | 说明 |
---|---|
traceType | SubBefore 表示消费前轨迹 |
timeStamp | 消息存储时间 |
regionId | 机房可用区(目前未使用) |
groupName | 消费组名称 |
requestId | 请求标识 |
msgId | 消息 Id |
retryTimes | 重试次数 |
keys | 消息 key |
消费前轨迹 SubAfter
名称 | 说明 |
---|---|
traceType | SubAfter 表示消费后轨迹 |
requestId | 请求标识 |
msgId | 消息 Id |
costTime | 消费耗时 |
isSuccess | 消费结果,true 消费成功、false 消费失败 |
keys | 消息 key |
contextCode | Broker 返回的消费状态,0:SUCCESS,1:TIME_OUT,2:EXCEPTION,3:RETURNNULL,4:FAILED |
访问控制表(Access Control List,ACL)描述用户或角色对资源的访问控制权限,RocketMQ 中的 ACL 见下表说明。
RocketMQ 中的 ACL 含义说明:
含义 | 说明 |
---|---|
用户 | 在 plain_acl.yml 配置文件用 accessKey 表示 |
角色 | admin 和其他角色 |
资源 | 包括主题和消费组 |
权限 | DENY 表示无权限 ANY 表示拥有 PUB 或者 SUB 权限 PUB 表示拥有主题发送权限 SUB 表示拥有消费组订阅权限 |
将 aclEnable = true
添加到 Broker 配置文件中,另外添加 ${ROCKETMQ_HOME}/conf/plain_acl.yml
文件,用于 ACL 控制。
1. Broker 配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable = true
aclEnable = true
说明
参数 | 含义 |
---|---|
aclEnable | 默认 false,开启 ACL 需要设置为 true |
ROCKETMQ_HOME | 可以通过 -Drocketmq.home.dir 指定 MQ 根目录 |
ACL fileName | 默认 /conf/plain_acl.yml,可以通过 -Drocketmq.acl.plain.file 指定 ACL 文件名称 |
2. plain_acl.yml 配置
ACL 配置文件由全局白名单配置(globalWhiteRemoteAddresses)和账户配置(accounts)两部分构成。
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTes1=DENY
- TopicTest2=PUB|SUB
groupPerms:
- consumerTest=DENY
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
说明
参数 | 含义 |
---|---|
globalWhiteRemoteAddresses | 全局白名单配置,策略如下: 空:忽略白名单,继续执行下面校验 全匹配模式:全部放行不会执行后面校验 例如:* 或 ... 或 ::::::: 多 IP 模式:表示白名单 IP 在设置区间段的放行 例如:192.168.0.{1,2} 或 192.168.1.1,192.168.1.2 或 192.168.*. 或 192.168.1-10.5-50 |
accessKey | 唯一用户标识 |
secretKey | 访问密码 |
whiteRemoteAddress | 用户级白名单,格式同 globalWhiteRemoteAddresses |
admin | 是否为管理员,管理员拥有所有资源访问权限 |
defaultTopicPerm | 默认主题权限,默认值 DENY |
defaultGroupPerm | 默认消费组权限,默认值 DENY |
topicPerms | 详细的主题权限 |
groupPerms | 详细的消费组权限 |
3. ACL 发送示例
在上面的配置文件中,将 TopicTes1 设置了 DENY 权限,即禁止发送和消费;将 TopicTest2 设置成了 PUB|SUB 权限,即允许发送和订阅权限。下面例子尝试向主题 TopicTes1 发送消息,观察其是否可以成功。
禁止发送示例
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest1",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
禁止发送截图
禁止发送说明
用户 RocketMQ 向主题 TopicTes1 发送消息时抛出 AclException,拒绝访问,如果将代码中主题换成 TopicTes2,则可以发送成功,接着看下文。
4. ACL 消费示例
在上面的配置文件中,将 consumerTest 设置了 DENY 权限,即禁止消费消息。由于 TopicTes2 设置为允许发送,我们下面尝试向 TopicTes2 发送一条消息,consumerTest 订阅了 TopicTes2 观察其是否可以消费。
允许发送示例
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest2",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
允许发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A800667FB218B4AAC2663AB66F0000, offsetMsgId=C0A8006600002A9F00000000000085EA, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=2]
禁止消费示例
public class AclConsumeTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
pushConsumer();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest2", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
}
}
}
}
禁止消费截图
禁止消费说明
我们向 TopicTest2 成功发送了一条消息,但由于消费组 consumerTest 被设置成禁止消费,所有未能收到该消息。
RocketMQ 提供了一系列命令动态更新 Acl 配置文件,使设置的权限及时生效。
1. 获取 ACL 配置版本
使用 clusterAclConfigVersion 命令查看版本信息。
参数说明
参数 | 说明 |
---|---|
-b | Broker 地址,更新特定的 Broker |
-c | 集群名称,更新集群中的所有 Broker |
-n | namesrv 地址 |
命令示例
$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #Broker Addr #AclConfigVersionNum #AclLastUpdateTime
DefaultCluster broker-a x.x.x.x:10911 0 2020-09-20 22:42:59
get cluster's plain access config version success.
2. 获取 Acl 权限配置
使用 getAccessConfigSubCommand 获取 ACL 的配置信息。
参数说明
参数 | 说明 |
---|---|
-b | Broker 地址,更新特定的 Broker |
-c | 集群名称,更新集群中的所有 Broker |
-n | namesrv 地址 |
命令示例
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.103.*, 192.168.0.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
3. 更新全局白名单
使用 updateGlobalWhiteAddr 对 ACL 的全局白名单 globalWhiteRemoteAddresses 进行变更。
参数说明
参数 | 说明 |
---|---|
-b | Broker 地址,更新特定的 Broker |
-c | 集群名称,更新集群中的所有 Broker |
-n | namesrv 地址 |
-g | 全局白名单值,例如:10.10.103.,192.168.0. |
命令示例
$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113.*,192.168.20.*
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update global white remote addresses to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
说明:全局白名单已经被更新。
4. 更新用户配置
对于用户账户的配置的变更通过 updateAclConfig 来实现。
参数说明
参数 | 说明 |
---|---|
-a | 指定 accessKey,变更哪个用户的配置 |
-b | Broker 地址,更新特定的 Broker |
-c | 集群名称,更新集群中的所有 Broker |
-n | namesrv 地址 |
-g | 设置 groupPerms 消费组权限,格式为:groupD=DENY,groupD=SUB |
-i | 设置 Acl 文件中 defaultTopicPerm 权限 |
-m | 设置 Acl 文件中 admin 权限 |
-s | 设置 Acl 文件中 secretKey 密钥值 |
-t | 设置 topicPerms 主题权限,格式为:topicA=DENY,topicD=SUB |
-u | 设置 Acl 文件中的默认消费组 defaultGroupPerm 权限 |
-w | 设置 Acl 文件中该用户下的白名单权限 whiteRemoteAddress |
命令示例
$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY,testTopicb=SUB
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create or update plain access config to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
accessKey : RocketMQ
secretKey : 87654321
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [testTopicA=DENY, testTopicb=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
说明:用户 RocketMQ 的密钥 secretKey 和主题权限 topicPerms 已变更生效。
5. 删除用户配置
通过 deleteAccessConfig 删除指定用户的 ACL 配置信息。
参数说明
参数 | 说明 |
---|---|
-b | Broker 地址,更新特定的 Broker |
-c | 集群名称,更新集群中的所有 Broker |
-n | namesrv 地址 |
-a | 指定特定用户 accessKey |
命令示例
$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
delete plain access config account to x.x.x.x:10911 success.
查看生效
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
说明:用户 RocketMQ 的相关 Acl 配置已被全部删除。
1. 规则加载
规则配置在 plain_acl.yml 文件中,需要加载到 Broker 缓存中使其生效。启动时会加载 acl 文件的内容,当其变更了也需要动态加载规则内容,详见如下流程。
2. 权限校验
注册的钩子程序通过 NettyServerHandler 实现,在 Broker 当前的 channel 接收到客户端消息时执行校验逻辑,入口为 NettyServerHandler#channelRead0() 以及 processRequestCommand#doBeforeRpcHooks。规则校验的入口位于 PlainAccessValidator#validate 方法,下面是校验简图。
RocketMQ 开源版本在 4.5.0 版本开始支持多副本(DLedger),在以前的版本中只支持主从模式。
主从模式存在的问题:
多副本使用 Raft 协议在节点意外掉线后能够完成自动选主,提高集群的高可用和保证数据的一致性。
由于 DLedger 基于 Raft 协议开发的功能,需要过半数选举,最少 3 个节点组成一个 Raft 组。
broker-n0.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
broker-n1.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
broker-n2.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
启动三个节点:
nohup bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup bin/mqbroker -c conf/dledger/broker-n2.conf &
查看是否启动成功:
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30921 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 3 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
说明:BID 为 0 表示表示 Master,其他两个均为 Follower。
控制台截图:
查看发送消息:
说明:通过以上步骤,我们完成多副本的搭建过程。
我们通过 kill 掉 Master 的方式来验证 DLedger 选主情况,上面的 clusterList 截图中我们看到 Master 为 x.x.x.x:30921,将该进程 kill 掉后观察一下。
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000
说明:kill 掉原 Master 后,完成自动选主,新的 Master 为 x.x.x.x:30931。
配置文件中多副本的参数说明见下面表格。
参数 | 说明 |
---|---|
enableDLegerCommitLog | 是否启用 DLedger,默认 false |
dLegerGroup | 节点所属的 Raft 组,建议与 broker 一致 |
dLegerPeers | 集群节点信息,示例:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId | 当前节点 id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字 |
Raft 的学习资料见下面链接,供学习使用。DLedger 的源码解读,见《RocketMQ 技术内幕》第二版。
使用多副本时,请做好压测,压测的 TPS 是否满足业务的需求,作者曾做过多副本压测与主从异步的 TPS 有相当大的差距。
在 TPS 满足的情况下,建议使用多副本架构,尤其是支付类可以优先使用。
如果线上已经存在了主从默认的架构如何升级到 DLedger 模式呢?