1. 概述
上篇文章寫了用戶端連接配接時的驗證,這篇文章寫下對于用戶端具體操作的權限控制,包括建立topic、釋出消息和訂閱消息等。
2. 用戶端權限控制
2.1 修改broker.conf檔案,打開權限控制
如果要開啟對權限的控制,首先需要打開對連接配接的驗證。
# Authorization provider fully qualified class-name
# 這個類需要我們自己實作,繼承org.apache.pulsar.broker.authorization.AuthorizationProvider接口即可
authorizationProvider=auth.server.VVPulsarAuthorizationProvider
# Allow wildcard matching in authorization
# (wildcard matching only applicable if wildcard-char:
# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
authorizationAllowWildcardsMatching=false
# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=vv-role,cc-role
2.2 實作AuthorizationProvider接口
下面隻是給了一個demo,裡面沒有對用戶端做任何控制。
package auth.server;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* @author cc
* @function
* @date 2021/7/27 14:38
*/
public class VVPulsarAuthorizationProvider implements AuthorizationProvider {
private static final Logger log = LoggerFactory.getLogger(VVPulsarAuthorizationProvider.class);
private static final String methodName = "vv_auth_v2";
@Override
public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
log.info(methodName + " initialize");
}
@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
return defaultResult("canProduceAsync", topicName, role);
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) {
return defaultResult("canConsumeAsync", topicName, role);
}
@Override
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) {
return defaultResult("canLookupAsync", topicName, role);
}
@Override
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return defaultResult("allowFunctionOpsAsync", namespaceName, role);
}
@Override
public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return defaultResult("allowSourceOpsAsync", namespaceName, role);
}
@Override
public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return defaultResult("allowSinkOpsAsync", namespaceName, role);
}
@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role, String authDataJson) {
return defaultVoidResult("grantPermissionAsync", namespace, role);
}
@Override
public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles, String authDataJson) {
return defaultVoidResult("grantSubscriptionPermissionAsync", namespace, roles.toString());
}
@Override
public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, String role, String authDataJson) {
return defaultVoidResult("revokeSubscriptionPermissionAsync", namespace, role);
}
@Override
public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role, String authDataJson) {
return defaultVoidResult("grantPermissionAsync", topicName, role);
}
@Override
public void close() throws IOException {
log.info(methodName + " close");
}
@Override
public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, String role, TenantOperation operation, AuthenticationDataSource authData) {
log.info(methodName + " " + tenantName + ", role " + role);
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
permissionFuture.complete(true);
return permissionFuture;
}
@Override
public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String role, NamespaceOperation operation, AuthenticationDataSource authData) {
return defaultResult("allowNamespaceOperationAsync", namespaceName, role);
}
@Override
public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation, String role, AuthenticationDataSource authData) {
return defaultResult("allowNamespacePolicyOperationAsync", namespaceName, role);
}
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
return defaultResult("allowTopicOperationAsync", topic, role);
}
@Override
public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topic, String role, PolicyName policy, PolicyOperation operation, AuthenticationDataSource authData) {
return defaultResult("allowTopicPolicyOperationAsync", topic, role);
}
CompletableFuture<Boolean> defaultResult(String name, TopicName topicName, String role) {
log.info(methodName + " " + name + ", topicName " + topicName.toString() + ", role " + role);
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
permissionFuture.complete(true);
return permissionFuture;
}
CompletableFuture<Boolean> defaultResult(String name, NamespaceName namespaceName, String role) {
log.info(methodName + " " + name + ", topicName " + namespaceName.toString() + ", role " + role);
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
permissionFuture.complete(true);
return permissionFuture;
}
CompletableFuture<Void> defaultVoidResult(String name, TopicName topicName, String role) {
log.info(methodName + " " + name + ", topicName " + topicName.toString() + ", role " + role);
CompletableFuture<Void> permissionFuture = new CompletableFuture<>();
permissionFuture.complete(null);
return permissionFuture;
}
CompletableFuture<Void> defaultVoidResult(String name, NamespaceName namespaceName, String role) {
log.info(methodName + " " + name + ", topicName " + namespaceName.toString() + ", role " + role);
CompletableFuture<Void> permissionFuture = new CompletableFuture<>();
permissionFuture.complete(null);
return permissionFuture;
}
}
3 了解權限控制
在pulsar中,權限是在role的層次上進行控制的。在用戶端連接配接的驗證過程中,會儲存用戶端的role,然後在上述接口中可以結合role和具體操作進行限制。
此處的role其實是VVAuthenticationProvider中傳回的,而這個資料是用戶端連接配接認證階段用戶端攜帶過去的。
是以,可以設定一個認證中心,統一管理所有的使用者權限。用戶端連接配接後去認證中心擷取自己的使用者資訊,攜帶使用者資訊連接配接到broker,然後broker根據使用者資訊去認證中心擷取連接配接權限和其他操作權限,進而實作對使用者的權限控制。