千家信息网

RocketMQ中怎么实现权限控制

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章将为大家详细讲解有关RocketMQ中怎么实现权限控制,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1、简单使用1.1、ACL是什么ACL是a
千家信息网最后更新 2025年02月03日RocketMQ中怎么实现权限控制

这篇文章将为大家详细讲解有关RocketMQ中怎么实现权限控制,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1、简单使用

1.1、ACL是什么

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

用户:用户是访问控制的基础要素,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。 资源:需要保护的对象,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。 权限:针对资源,能进行的操作。 角色:RocketMQ中,只定义两种角色:是否是管理员。

1.2、RocketMQ中配置ACL

acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下

需要使用acl必须在服务端开启此功能,在Broker的配置文件中配置,aclEnable = true开启此功能

配置plain_acl.yml文件

globalWhiteRemoteAddresses:- 10.10.15.*- 192.168.0.*accounts:- accessKey: RocketMQ  secretKey: 12345678  whiteRemoteAddress:  admin: false  defaultTopicPerm: DENY  defaultGroupPerm: SUB  topicPerms:  - topicA=DENY  - topicB=PUB|SUB  - topicC=SUB  groupPerms:  # the group should convert to retry topic  - groupA=DENY  - groupB=PUB|SUB  - groupC=SUB- accessKey: rocketmq2  secretKey: 12345678  whiteRemoteAddress: 192.168.1.*  # if it is admin, it could access all resources  admin: true

下面我们介绍一下plain_acl.yml文件中相关的参数含义及使用

字段取值含义
globalWhiteRemoteAddresses*;192.168.*.*;192.168.0.1全局IP白名单
accessKey字符串Access Key 用户名
secretKey字符串Secret Key 密码
whiteRemoteAddress*;192.168.*.*;192.168.0.1用户IP白名单
admintrue;false是否管理员账户
defaultTopicPermDENY;PUB;SUB;PUB|SUB默认的Topic权限
defaultGroupPermDENY;PUB;SUB;PUB|SUB默认的ConsumerGroup权限
topicPermstopic=权限各个Topic的权限
groupPermsgroup=权限各个ConsumerGroup的权限

权限标识符的含义

权限含义
DENY拒绝
ANYPUB 或者 SUB 权限
PUB发送权限
SUB订阅权限

处理流程

特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;

对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限

RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点

如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中

1.3、代码示例

1.3.1、生产者代码

public class AclProducer {    public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());        producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");        producer.start();        for (int i = 0; i < 10; i++) {            try {                Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                SendResult sendResult = producer.send(msg);                System.out.printf("%s%n", sendResult);            } catch (Exception e) {                e.printStackTrace();                Thread.sleep(1000);            }        }        producer.shutdown();    }    static RPCHook getAclRPCHook() {        return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));    }}

查看结果

报错提示topicA没有权限,我们在plain_acl.yml文件中配置的也确实是RocketMQ用户拒绝,生产消费topicA主题信息,我们改变主题为topicB,则发现发送消息成功,topicB=PUB|SUB设置的权限是生产消费都可以。

查看结果

1.3.2、消费者代码

public class AclConsumer {    public static void main(String[] args) throws InterruptedException, MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely());        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.subscribe("topicB", "*");        consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List msgs,                ConsumeConcurrentlyContext context) {                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        consumer.start();        System.out.printf("Consumer Started.%n");    }    static RPCHook getAclRPCHook() {        return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));    }}

查看结果:发现没有任何消息被消费,也没有报错信息,对于RocketMQ用户topicB设置的就是可以可以生产可以消费的,但是我们发现其groupA=DENY是拒绝的,说明消费组是groupA则拒绝消费任何消息,我们改成groupB或者groupC查看结果。

2、源码分析

Broker端ACL原理图

2.1、Broker初始化时ACL相关操作

Broker服务启动时创建BrokerController并初始化initialize()时调用acl相关的初始化方法initialAcl()

private void initialAcl() {        //broker配置文件中是否开启ACL功能,默认关闭    if (!this.brokerConfig.isAclEnable()) {        log.info("The broker dose not enable acl");        return;    }    //获取权限访问校验器的列表,加载的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向    //org.apache.rocketmq.acl.plain.PlainAccessValidator,默认只有一个    List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);    if (accessValidators == null || accessValidators.isEmpty()) {        log.info("The broker dose not load the AccessValidator");        return;    }    for (AccessValidator accessValidator: accessValidators) {        final AccessValidator validator = accessValidator;        //注册服务端就的"钩子"对象,对权限进行校验        this.registerServerRPCHook(new RPCHook() {            @Override            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {                //Do not catch the exception                validator.validate(validator.parse(request, remoteAddr));            }            @Override            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {            }        });    }}

源码中有相关的注解,我们查看一下注册registerServerRPCHook方法

public void registerServerRPCHook(RPCHook rpcHook) {        //服务端的NettyRemotingServer服务注册"钩子"函数    getRemotingServer().registerRPCHook(rpcHook);    this.fastRemotingServer.registerRPCHook(rpcHook);}

关于NettyRemotingServer服务和NettyRemotingClient服务配合使用,后面章节RocketMQ Remoting会重点分析

2.2、 PlainAccessValidator权限验证器

PlainAccessValidator.parse(),根据客户端不同的请求Code其需要的检验资源也不一样

switch (request.getCode()) {        //发送消息需要校验当前的账户的topic是否具有PUB权限    case RequestCode.SEND_MESSAGE:        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);        break;    case RequestCode.SEND_MESSAGE_V2:        accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);        break;    case RequestCode.CONSUMER_SEND_MSG_BACK:        accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);        accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);        break;    //拉取消息时需要知道该consumer账户下拉取的topic是否具有SUB权限,并且还要知道订阅组consumerGroup是否有sub权限    case RequestCode.PULL_MESSAGE:        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);        accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);        break;    case RequestCode.QUERY_MESSAGE:        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);        break;    case RequestCode.HEART_BEAT:        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {            accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);            for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {                accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);            }        }        break;    case RequestCode.UNREGISTER_CLIENT:        final UnregisterClientRequestHeader unregisterClientRequestHeader =            (UnregisterClientRequestHeader) request                .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);        accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);        break;    case RequestCode.GET_CONSUMER_LIST_BY_GROUP:        final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =            (GetConsumerListByGroupRequestHeader) request                .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);        accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);        break;    case RequestCode.UPDATE_CONSUMER_OFFSET:        final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =            (UpdateConsumerOffsetRequestHeader) request                .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);        accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);        accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);        break;    default:        break;}

根据request.getCode()获取当前的操作需要的权限标识集合,供后面与系统的权限配置文件plain_acl.yml中的权限标识符校验时使用

2.3、PlainPermissionLoader资源加载器

Broker初始化相关服务的时候创建了PlainAccessValidator,我们发现其默认的构造方法中调用了其权限资源加载器PlainPermissionLoader

public PlainAccessValidator() {    aclPlugEngine = new PlainPermissionLoader();}

创建PlainPermissionLoader对象

public PlainPermissionLoader() {        //加载服务端的权限文件plain_acl.yml    load();    //开启线程每500ms检测权限文件是否改变,若改变则执行load()从新加载权限文件    watch();}

查看load方法流程

public void load() {    Map plainAccessResourceMap = new HashMap<>();    List globalWhiteRemoteAddressStrategy = new ArrayList<>();    JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,        JSONObject.class);    if (plainAclConfData == null || plainAclConfData.isEmpty()) {        throw new AclException(String.format("%s file  is not data", fileHome + File.separator + fileName));    }    log.info("Broker plain acl conf data is : ", plainAclConfData.toString());    //获取全局白名单IP集合    JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");    if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {        for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {            globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.                    getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));        }    }    //获取账户权限集合    JSONArray accounts = plainAclConfData.getJSONArray("accounts");    if (accounts != null && !accounts.isEmpty()) {        List plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);        for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {                //构建每个账户的权限资源            PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);            //放入Map中AccessKey作为key,该账户的权限资源作为value            plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);        }    }    this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;    this.plainAccessResourceMap = plainAccessResourceMap;}

加载资源文件,解析其中的权限标识,等待权限校验器PlainAccessValidator调用其validate()对权限校验

2.4、权限校验流程

核心的校验方法PlainPermissionLoader.validate()

public void validate(PlainAccessResource plainAccessResource) {    //全局的白名单IP进行校验    for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {        //匹配成功说明是全局的白名单IP,具有所有权限,直接返回。            if (remoteAddressStrategy.match(plainAccessResource)) {            return;        }    }    //判断用户名是否为空,null则抛出AclException异常    if (plainAccessResource.getAccessKey() == null) {        throw new AclException(String.format("No accessKey is configured"));    }    //校验账户是否存在于服务端的权限资源文件中plain_acl.yml,不在则抛出异常    if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {        throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));    }    PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());    //检查该账户的白名单IP是否匹配上客户端IP,匹配成功具有所有权限,除UPDATE_AND_CREATE_TOPIC等特殊权限需要管理员权限    if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {        return;    }    //校验签名    String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());    if (!signature.equals(plainAccessResource.getSignature())) {        throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));    }    //校验账户内的资源权限    checkPerm(plainAccessResource, ownedAccess);}

查看其对于当前账户内部的资源校验

void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {        //判断请求的命令的Code是否需要管理员权限,并判断该用户是否是管理员    if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {        throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));    }    Map needCheckedPermMap = needCheckedAccess.getResourcePermMap();    Map ownedPermMap = ownedAccess.getResourcePermMap();    if (needCheckedPermMap == null) {        // If the needCheckedPermMap is null,then return        return;    }    for (Map.Entry needCheckedEntry : needCheckedPermMap.entrySet()) {        String resource = needCheckedEntry.getKey();        Byte neededPerm = needCheckedEntry.getValue();        //判断是否是group,在构建resourcePermMap时候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup        boolean isGroup = PlainAccessResource.isRetryTopic(resource);        //系统的权限配置文件中配置项包不含该客户端命令请求需要的权限        if (!ownedPermMap.containsKey(resource)) {            //判断其是否是topic还是group的权限标识,获取该类型的全局的权限是什么            byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :                needCheckedAccess.getDefaultTopicPerm();            //核对权限            if (!Permission.checkPermission(neededPerm, ownedPerm)) {                throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));            }            continue;        }        //系统的权限配置文件中配置项包含该客户端命令请求需要的权限,则直接判断其权限        if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {            throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));        }    }}

所有的检验流程如果有一项不满足则抛出AclException异常

2.5、客户端发送请求

上面图中只是分析了Broker服务端的处理流程,客户端如何调用我们具体分析下我们以发送消息为例:

我们之前分析过Producer的消息发送的核心方法是DefaultMQProducerImpl.sendKernelImpl()该方法

//是否注册了"钩子"if (this.hasSendMessageHook()) {    context = new SendMessageContext();    context.setProducer(this);    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());    context.setCommunicationMode(communicationMode);    context.setBornHost(this.defaultMQProducer.getClientIP());    context.setBrokerAddr(brokerAddr);    context.setMessage(msg);    context.setMq(mq);    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);    if (isTrans != null && isTrans.equals("true")) {        context.setMsgType(MessageType.Trans_Msg_Half);    }    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {        context.setMsgType(MessageType.Delay_Msg);    }    //封装其ACL请求的参数信息    this.executeSendMessageHookBefore(context);}

hasSendMessageHook(),我们在构建Producer的时候创建了该对象,加入到DefaultMQProducerImpl的sendMessageHookList属性中。

我们查看其发送消息NettyRemotingClient类中调用AclClientRPCHook.doBeforeRequest()发送前的数据准备

public void doBeforeRequest(String remoteAddr, RemotingCommand request) {    byte[] total = AclUtils.combineRequestContent(request,        parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));    String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());    request.addExtField(SIGNATURE, signature);    request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());        // The SecurityToken value is unneccessary,user can choose this one.    if (sessionCredentials.getSecurityToken() != null) {        request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());    }}

关于RocketMQ中怎么实现权限控制就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0