天天看點

pulsar的用戶端權限控制功能(二)

1. 概述

上篇文章寫了用戶端連接配接時的驗證,這篇文章寫下對于用戶端具體操作的權限控制,包括建立topic、釋出消息和訂閱消息等。

2. 用戶端權限控制

2.1 修改broker.conf檔案,打開權限控制

如果要開啟對權限的控制,首先需要打開對連接配接的驗證。

# Authorization provider fully qualified class-name
# 這個類需要我們自己實作,繼承org.apache.pulsar.broker.authorization.AuthorizationProvider接口即可
authorizationProvider=auth.server.VVPulsarAuthorizationProvider

# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=vv-role,cc-role

           

2.2 實作AuthorizationProvider接口

下面隻是給了一個demo,裡面沒有對用戶端做任何控制。

package auth.server;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
 * @author cc
 * @function
 * @date 2021/7/27 14:38
 */
public class VVPulsarAuthorizationProvider implements AuthorizationProvider {
    private static final Logger log = LoggerFactory.getLogger(VVPulsarAuthorizationProvider.class);
    private static final String methodName = "vv_auth_v2";

    @Override
    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
        log.info(methodName + " initialize");
    }

    @Override
    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
        return defaultResult("canProduceAsync", topicName, role);
    }

    @Override
    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) {
        return defaultResult("canConsumeAsync", topicName, role);
    }

    @Override
    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
        return defaultResult("canLookupAsync", topicName, role);
    }

    @Override
    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
        return defaultResult("allowFunctionOpsAsync", namespaceName, role);
    }

    @Override
    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
        return defaultResult("allowSourceOpsAsync", namespaceName, role);
    }

    @Override
    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
        return defaultResult("allowSinkOpsAsync", namespaceName, role);
    }

    @Override
    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role, String authDataJson) {
        return defaultVoidResult("grantPermissionAsync", namespace, role);
    }

    @Override
    public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles, String authDataJson) {
        return defaultVoidResult("grantSubscriptionPermissionAsync", namespace, roles.toString());
    }

    @Override
    public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, String role, String authDataJson) {
        return defaultVoidResult("revokeSubscriptionPermissionAsync", namespace, role);
    }

    @Override
    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role, String authDataJson) {
        return defaultVoidResult("grantPermissionAsync", topicName, role);
    }

    @Override
    public void close() throws IOException {
        log.info(methodName + " close");
    }

    @Override
    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
        log.info(methodName + " " + tenantName + ", role " + role);
        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
        permissionFuture.complete(true);
        return permissionFuture;
    }

    @Override
    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
        return defaultResult("allowNamespaceOperationAsync", namespaceName, role);
    }

    @Override
    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
        return defaultResult("allowNamespacePolicyOperationAsync", namespaceName, role);
    }

    @Override
    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
        return defaultResult("allowTopicOperationAsync", topic, role);
    }

    @Override
    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
        return defaultResult("allowTopicPolicyOperationAsync", topic, role);
    }

    CompletableFuture<Boolean> defaultResult(String name, TopicName topicName, String role) {
        log.info(methodName + " " + name + ", topicName " + topicName.toString() + ", role " + role);
        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
        permissionFuture.complete(true);
        return permissionFuture;
    }

    CompletableFuture<Boolean> defaultResult(String name, NamespaceName namespaceName, String role) {
        log.info(methodName + " " + name + ", topicName " + namespaceName.toString() + ", role " + role);
        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
        permissionFuture.complete(true);
        return permissionFuture;
    }

    CompletableFuture<Void> defaultVoidResult(String name, TopicName topicName, String role) {
        log.info(methodName + " " + name + ", topicName " + topicName.toString() + ", role " + role);
        CompletableFuture<Void> permissionFuture = new CompletableFuture<>();
        permissionFuture.complete(null);
        return permissionFuture;
    }

    CompletableFuture<Void> defaultVoidResult(String name, NamespaceName namespaceName, String role) {
        log.info(methodName + " " + name + ", topicName " + namespaceName.toString() + ", role " + role);
        CompletableFuture<Void> permissionFuture = new CompletableFuture<>();
        permissionFuture.complete(null);
        return permissionFuture;
    }
}

           

3 了解權限控制

在pulsar中,權限是在role的層次上進行控制的。在用戶端連接配接的驗證過程中,會儲存用戶端的role,然後在上述接口中可以結合role和具體操作進行限制。

此處的role其實是VVAuthenticationProvider中傳回的,而這個資料是用戶端連接配接認證階段用戶端攜帶過去的。

是以,可以設定一個認證中心,統一管理所有的使用者權限。用戶端連接配接後去認證中心擷取自己的使用者資訊,攜帶使用者資訊連接配接到broker,然後broker根據使用者資訊去認證中心擷取連接配接權限和其他操作權限,進而實作對使用者的權限控制。