23 消息轨迹、ACL 与多副本搭建

消息轨迹

消息轨迹含义

一条消息什么时候由哪台机器产生的、发送的耗时、消息大小、发送状态、存储在哪个 Broker 上、什么时候存储的以及存储在哪台 Broker 上、什么时候消费的、消费状态等信息,这些信息即消息轨迹,用于追踪消息从诞生到被消费的整个生命周期。

这些信息对于业务同学排查定位有着重要的意义,发送和消费往往在不同的业务部门。有了消息轨迹后一条消息有没有发送,发送成功了没,有没有消费一目了然,降低了彼此的沟通成本。

消息轨迹使用

1. Broker 设置

RocketMQ 的默认消息轨迹功能默认是关闭的,可以将 Broker 属性 traceTopicEnable 设置为 true 开启。消息轨迹默认存储在 RMQ_SYS_TRACE_TOPIC 的主题中,可以通过 msgTraceTopicName 修改。

属性 默认参数
traceTopicEnable false
msgTraceTopicName RMQ_SYS_TRACE_TOPIC

2. 发送端使用

发送轨迹 API

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic){
   this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}

说明:enableMsgTrace 是否开启发送轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。

发送代码示例

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++)
            try {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID111",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

说明:创建 DefaultMQProducer 时将 enableMsgTrace 设置为 true 开启发送消息轨迹。

3. 消费端使用

消费轨迹 API

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}

说明:enableMsgTrace 是否开启消费轨迹,默认 false;customizedTraceTopic 设置收集消息轨迹的自定义主题,默认为 RMQ_SYS_TRACE_TOPIC。

消费代码示例

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
}

说明:创建 DefaultMQPushConsumer 将 enableMsgTrace 设置为 true 开启消费消息轨迹。

4. 消息轨迹效果

通过发送和消费一条消息,在 RocketMQ-Console 中看下消息轨迹的效果截图。

发送消息内容

SendResult [sendStatus=SEND_OK, msgId=A9FE1075810A18B4AAC24A40738B0000, offsetMsgId=A9FE107500002A9F0000000000002147, messageQueue=MessageQueue [topic=TopicTest, brokerName=liangyong, queueId=1], queueOffset=2]

消费消息内容

Receive New Messages: [MessageExt [brokerName=liangyong, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1600135337872, bornHost=/169.254.16.117:65532, storeTimestamp=1600135337883, storeHost=/169.254.16.117:10911, msgId=A9FE107500002A9F0000000000002147, commitLogOffset=8519, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=OrderID111, CONSUME_START_TIME=1600135337915, UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]] 

消息轨迹展现

在 RocketMQ 控制台,可以通过 message key 或者 message id 检索消息内容,如下图:

img

点击 MESSAGE TRACE DETAIL 可以查看消息轨迹,如下图:

img

消息轨迹原理

发送轨迹原理:在消息发送前与发送后收集指标信息,并将指标信息异步发送到轨迹主题。

img

消费轨迹原理:消费的消息轨迹有两部分,一部分在拉取消息后处理消息前收集指标异步发送都轨迹主题;另一部分处理消息后收集消息指标异步发送到轨迹主题。

img

轨迹格式说明

消息轨迹类型有三种,Pub 指发送轨迹,SubBefore 指消费前轨迹,SubAfter 指消费后轨迹。

发送轨迹 Pub

名称 说明
TraceType Pub 表示发送轨迹
timeStamp 存储时间
regionId 机房可用区,默认为 DefaultRegion(目前没有使用)
groupName 生产者组 producerGroup
topic 主题名称
msgId 消息 ID,由客户端生成
tags 消息 tag
keys 消息 key
storeHost 消息存储 Broker IP
bodyLength 消息体大小
costTime 发送消息耗时
msgType 消息类型:普通消息(Normal_Msg)、事物半消息(Trans_Msg_Half)、 事物提交消息(Trans_msg_Commit)、延迟消息(Delay_Msg)
offsetMsgId 消息 Id,由 Broker 生成
isSuccess 发送是否成功,true 表示成功、false 表示失败

消费前轨迹 SubBefore

名称 说明
traceType SubBefore 表示消费前轨迹
timeStamp 消息存储时间
regionId 机房可用区(目前未使用)
groupName 消费组名称
requestId 请求标识
msgId 消息 Id
retryTimes 重试次数
keys 消息 key

消费前轨迹 SubAfter

名称 说明
traceType SubAfter 表示消费后轨迹
requestId 请求标识
msgId 消息 Id
costTime 消费耗时
isSuccess 消费结果,true 消费成功、false 消费失败
keys 消息 key
contextCode Broker 返回的消费状态,0:SUCCESS,1:TIME_OUT,2:EXCEPTION,3:RETURNNULL,4:FAILED

消息轨迹结语

ACL

ACL 含义

访问控制表(Access Control List,ACL)描述用户或角色对资源的访问控制权限,RocketMQ 中的 ACL 见下表说明。

RocketMQ 中的 ACL 含义说明:

含义 说明
用户 在 plain_acl.yml 配置文件用 accessKey 表示
角色 admin 和其他角色
资源 包括主题和消费组
权限 DENY 表示无权限 ANY 表示拥有 PUB 或者 SUB 权限 PUB 表示拥有主题发送权限 SUB 表示拥有消费组订阅权限

ACL 使用示例

aclEnable = true 添加到 Broker 配置文件中,另外添加 ${ROCKETMQ_HOME}/conf/plain_acl.yml 文件,用于 ACL 控制。

1. Broker 配置

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable = true
aclEnable = true

说明

参数 含义
aclEnable 默认 false,开启 ACL 需要设置为 true
ROCKETMQ_HOME 可以通过 -Drocketmq.home.dir 指定 MQ 根目录
ACL fileName 默认 /conf/plain_acl.yml,可以通过 -Drocketmq.acl.plain.file 指定 ACL 文件名称

2. plain_acl.yml 配置

ACL 配置文件由全局白名单配置(globalWhiteRemoteAddresses)和账户配置(accounts)两部分构成。

globalWhiteRemoteAddresses:

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTes1=DENY
  - TopicTest2=PUB|SUB
  groupPerms:
  - consumerTest=DENY

- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  admin: true

说明

参数 含义
globalWhiteRemoteAddresses 全局白名单配置,策略如下: 空:忽略白名单,继续执行下面校验 全匹配模式:全部放行不会执行后面校验 例如:* 或 ...::::::: 多 IP 模式:表示白名单 IP 在设置区间段的放行 例如:192.168.0.{1,2} 或 192.168.1.1,192.168.1.2 或 192.168.*. 或 192.168.1-10.5-50
accessKey 唯一用户标识
secretKey 访问密码
whiteRemoteAddress 用户级白名单,格式同 globalWhiteRemoteAddresses
admin 是否为管理员,管理员拥有所有资源访问权限
defaultTopicPerm 默认主题权限,默认值 DENY
defaultGroupPerm 默认消费组权限,默认值 DENY
topicPerms 详细的主题权限
groupPerms 详细的消费组权限

3. ACL 发送示例

在上面的配置文件中,将 TopicTes1 设置了 DENY 权限,即禁止发送和消费;将 TopicTest2 设置成了 PUB|SUB 权限,即允许发送和订阅权限。下面例子尝试向主题 TopicTes1 发送消息,观察其是否可以成功。

禁止发送示例

public class AclSendTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        producer();
    }
    public static void producer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }
}

禁止发送截图

img

禁止发送说明

用户 RocketMQ 向主题 TopicTes1 发送消息时抛出 AclException,拒绝访问,如果将代码中主题换成 TopicTes2,则可以发送成功,接着看下文。

4. ACL 消费示例

在上面的配置文件中,将 consumerTest 设置了 DENY 权限,即禁止消费消息。由于 TopicTes2 设置为允许发送,我们下面尝试向 TopicTes2 发送一条消息,consumerTest 订阅了 TopicTes2 观察其是否可以消费。

允许发送示例

public class AclSendTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        producer();
    }
    public static void producer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TopicTest2",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }
}

允许发送结果

SendResult [sendStatus=SEND_OK, msgId=C0A800667FB218B4AAC2663AB66F0000, offsetMsgId=C0A8006600002A9F00000000000085EA, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=2]

禁止消费示例

public class AclConsumeTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        pushConsumer();
    }

    public static void pushConsumer() throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TopicTest2", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20180422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                printBody(msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }

    private static void printBody(List<MessageExt> msg) {
        if (msg == null || msg.size() == 0)
            return;
        for (MessageExt m : msg) {
            if (m != null) {
                System.out.printf("msgId : %s  body : %s  \n\r", m.getMsgId(), new String(m.getBody()));
            }
        }
    }
}

禁止消费截图

img

禁止消费说明

我们向 TopicTest2 成功发送了一条消息,但由于消费组 consumerTest 被设置成禁止消费,所有未能收到该消息。

ACL 命令汇总

RocketMQ 提供了一系列命令动态更新 Acl 配置文件,使设置的权限及时生效。

1. 获取 ACL 配置版本

使用 clusterAclConfigVersion 命令查看版本信息。

参数说明

参数 说明
-b Broker 地址,更新特定的 Broker
-c 集群名称,更新集群中的所有 Broker
-n namesrv 地址

命令示例

$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #Broker Addr            #AclConfigVersionNum  #AclLastUpdateTime
DefaultCluster    broker-a                x.x.x.x:10911      0                     2020-09-20 22:42:59
get cluster's plain access config version success.

2. 获取 Acl 权限配置

使用 getAccessConfigSubCommand 获取 ACL 的配置信息。

参数说明

参数 说明
-b Broker 地址,更新特定的 Broker
-c 集群名称,更新集群中的所有 Broker
-n namesrv 地址

命令示例

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.103.*, 192.168.0.*]

accounts:
  accessKey         : RocketMQ
  secretKey         : 12345678
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

3. 更新全局白名单

使用 updateGlobalWhiteAddr 对 ACL 的全局白名单 globalWhiteRemoteAddresses 进行变更。

参数说明

参数 说明
-b Broker 地址,更新特定的 Broker
-c 集群名称,更新集群中的所有 Broker
-n namesrv 地址
-g 全局白名单值,例如:10.10.103.,192.168.0.

命令示例

$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113.*,192.168.20.*
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update global white remote addresses to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : RocketMQ
  secretKey         : 12345678
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

说明:全局白名单已经被更新。

4. 更新用户配置

对于用户账户的配置的变更通过 updateAclConfig 来实现。

参数说明

参数 说明
-a 指定 accessKey,变更哪个用户的配置
-b Broker 地址,更新特定的 Broker
-c 集群名称,更新集群中的所有 Broker
-n namesrv 地址
-g 设置 groupPerms 消费组权限,格式为:groupD=DENY,groupD=SUB
-i 设置 Acl 文件中 defaultTopicPerm 权限
-m 设置 Acl 文件中 admin 权限
-s 设置 Acl 文件中 secretKey 密钥值
-t 设置 topicPerms 主题权限,格式为:topicA=DENY,topicD=SUB
-u 设置 Acl 文件中的默认消费组 defaultGroupPerm 权限
-w 设置 Acl 文件中该用户下的白名单权限 whiteRemoteAddress

命令示例

$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY,testTopicb=SUB
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create or update plain access config to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

  accessKey         : RocketMQ
  secretKey         : 87654321
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [testTopicA=DENY, testTopicb=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

说明:用户 RocketMQ 的密钥 secretKey 和主题权限 topicPerms 已变更生效。

5. 删除用户配置

通过 deleteAccessConfig 删除指定用户的 ACL 配置信息。

参数说明

参数 说明
-b Broker 地址,更新特定的 Broker
-c 集群名称,更新集群中的所有 Broker
-n namesrv 地址
-a 指定特定用户 accessKey

命令示例

$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
delete plain access config account to x.x.x.x:10911 success.

查看生效

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

说明:用户 RocketMQ 的相关 Acl 配置已被全部删除。

ACL 原理简述

1. 规则加载

规则配置在 plain_acl.yml 文件中,需要加载到 Broker 缓存中使其生效。启动时会加载 acl 文件的内容,当其变更了也需要动态加载规则内容,详见如下流程。

img

2. 权限校验

注册的钩子程序通过 NettyServerHandler 实现,在 Broker 当前的 channel 接收到客户端消息时执行校验逻辑,入口为 NettyServerHandler#channelRead0() 以及 processRequestCommand#doBeforeRpcHooks。规则校验的入口位于 PlainAccessValidator#validate 方法,下面是校验简图。

img

ACL 结语

多副本搭建

多副本意义

RocketMQ 开源版本在 4.5.0 版本开始支持多副本(DLedger),在以前的版本中只支持主从模式。

主从模式存在的问题:

多副本使用 Raft 协议在节点意外掉线后能够完成自动选主,提高集群的高可用和保证数据的一致性。

多副本搭建

由于 DLedger 基于 Raft 协议开发的功能,需要过半数选举,最少 3 个节点组成一个 Raft 组。

broker-n0.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

broker-n1.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

broker-n2.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

启动三个节点:

nohup bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup bin/mqbroker -c conf/dledger/broker-n2.conf &

查看是否启动成功:

$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster       RaftNode00              0     x.x.x.x:30921     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000
RaftCluster       RaftNode00              1     x.x.x.x:30911     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000
RaftCluster       RaftNode00              3     x.x.x.x:30931     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000

说明:BID 为 0 表示表示 Master,其他两个均为 Follower。

控制台截图:

img

查看发送消息:

img

说明:通过以上步骤,我们完成多副本的搭建过程。

重新选主

我们通过 kill 掉 Master 的方式来验证 DLedger 选主情况,上面的 clusterList 截图中我们看到 Master 为 x.x.x.x:30921,将该进程 kill 掉后观察一下。

$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster       RaftNode00              0     x.x.x.x:30931     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444664.03 -1.0000
RaftCluster       RaftNode00              1     x.x.x.x:30911     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444664.03 -1.0000

说明:kill 掉原 Master 后,完成自动选主,新的 Master 为 x.x.x.x:30931。

参数说明

配置文件中多副本的参数说明见下面表格。

参数 说明
enableDLegerCommitLog 是否启用 DLedger,默认 false
dLegerGroup 节点所属的 Raft 组,建议与 broker 一致
dLegerPeers 集群节点信息,示例:n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId 当前节点 id。取自 legerPeers 中条目的开头,即上述示例中的 n0,并且特别需要强调,只能第一个字符为英文,其他字符需要配置成数字

参考资料

Raft 的学习资料见下面链接,供学习使用。DLedger 的源码解读,见《RocketMQ 技术内幕》第二版。

多副本结语

使用多副本时,请做好压测,压测的 TPS 是否满足业务的需求,作者曾做过多副本压测与主从异步的 TPS 有相当大的差距。

在 TPS 满足的情况下,建议使用多副本架构,尤其是支付类可以优先使用。

如果线上已经存在了主从默认的架构如何升级到 DLedger 模式呢?

  1. 可以参考前面平滑扩所容的方式,将多副本组成的 Raft 组加入到原集群中
  2. 关闭原主从架构节点的写入权限
  3. 在过了日志存储时间后,将主从架构节点下线