天天看點

zookeeper和kafka的SASL認證以及生産實踐

一、什麼是zookeeper?

ZooKeeper是一個集中的服務,用于維護配置資訊、命名、提供分布式同步以及提供組服務。所有這些類型的服務都以某種形式被分布式應用程式使用。每次它們被實作時,都有大量的工作需要去修複不可避免的bug和競争條件。由于實作這類服務的困難,應用程式最初通常會略過它們,這使得它們在出現變化時變得脆弱,難以管理。即使做得正确,這些服務的不同實作在部署應用程式時也會導緻管理複雜性。

總之:zookeeper是一種高可靠的分布式協調元件,主要用來解決分布式一緻性和分布式鎖、中繼資料/配置資訊管理的問題。

二、zookeeper的使用場景

  • 分布式協調:這個其實是zk很經典的一個用法,簡單來說,就好比,你A系統發送個請求到mq,然後B消息消費之後處理了。那A系統如何知道B系統的處理結果?用zk就可以實作分布式系統之間的協調工作。A系統發送請求之後可以在zk上對某個節點的值注冊個監聽器,一旦B系統處理完了就修改zk那個節點的值,A立馬就可以收到通知,完美解決。
    zookeeper和kafka的SASL認證以及生産實踐
  • 分布式鎖:對某一個資料連續發出兩個修改操作,兩台機器同時收到了請求,但是隻能一台機器先執行另外一個機器再執行。那麼此時就可以使用zk分布式鎖,一個機器接收到了請求之後先擷取zk上的一把分布式鎖,就是可以去建立一個znode,接着執行操作;然後另外一個機器也嘗試去建立那個znode,結果發現自己建立不了,因為被别人建立了,那隻能等着,等第一個機器執行完了自己再執行。
    zookeeper和kafka的SASL認證以及生産實踐
  • 中繼資料/配置資訊管理:zk可以用作很多系統的配置資訊的管理,比如kafka、storm、Doubbo等等很多分布式系統都會選用zk來做一些中繼資料、配置資訊的管理
    zookeeper和kafka的SASL認證以及生産實踐

三、zookeeper節點有哪些特性,什麼時候使用什麼特性的節點?

在 ZooKeeper 中,節點類型可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及有序節點(SEQUENTIAL ),具體在節點建立過程中,一般是組合使用。

  • 持久節點:是指在節點建立後,就一直存在,直到有删除操作來主動清除這個節點——不會因為建立該節點的用戶端會話失效而消失。
  • 持久順序節點:這類節點的基本特性和上面的節點類型是一緻的。額外的特性是,在ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點建立的先後順序。基于這個特性,在建立子節點的時候,可以設定這個屬性,那麼在建立節點過程中,ZK會自動為給定節點名加上一個數字字尾,作為新的節點名。這個數字字尾的範圍是整型的最大值。
  • 臨時節點:與持久節點不同的是,臨時節點的生命周期和用戶端會話綁定。也就是說,如果用戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連接配接斷開。另外,在臨時節點下面不能建立子節點。
  • 事件監聽:在讀取資料時,我們可以同時對節點設定事件監聽,當節點資料或結構變化時,zookeeper會通知用戶端。目前zookeeper有如下四種事件:1)節點建立;2)節點删除;3)節點資料修改;4)子節點變更。

四、使用zookeeper實作服務注冊中心,原理是什麼?用到了zk的哪些特性?

zookeeper可以充當一個服務系統資料庫(Service Registry),讓多個服務提供者形成一個叢集,讓服務消費者通過服務系統資料庫擷取具體的服務通路位址(ip+端口)去通路具體的服務提供者。具體來說,zookeeper就是個分布式檔案系統,每當一個服務提供者部署後都要将自己的服務注冊到zookeeper的某一路徑上,每當一個服務啟動後會将自己的服務名稱、IP等資訊注冊到zookeeper的ZNode上。

zookeeper和kafka的SASL認證以及生産實踐

服務調用者第一次請求過來的時候會傳回叢集的IP位址的集合并注冊一個服務的監聽,然後在前端通過負載均衡算法(随機,輪詢,權重輪詢,最小連接配接數,hash)将對應的請求發到對應的裝置上,而zookeeper和服務提供者之間會存在一個心跳機制,定時發送一個資料包來檢查該伺服器的狀态,當接收不到資料包時,則會将該裝置注冊的相對應的服務進行删除,然後會push到服務調用者将對應的ip進行删除防止後續請求轉發到已經宕掉的伺服器上。

使用了資料節點持久化,事件監聽的特性,分布式一緻性特性

五、zookeeper叢集搭建

多年之前我寫了一篇簡易的zookeeper入門教程,如果是搭建僞叢集的話可以參考以上這篇文章,位址:[

https://zhupei.blog.csdn.net/article/details/51517460

](

"https://zhupei.blog.csdn.net/article/details/51517460")

下面是搭建生産環境的步驟:

準備3台伺服器,例如node1、node2、node3,首先進入node1中

  • 1、下載下傳zk3.5版本的, http://www.apache.org/dyn/closer.cgi/zookeeper/
  • 2、 解壓到相應目錄,并在目錄下建立dataDir檔案夾
  • 3、在conf目錄在做配置:先拷貝一份 zoo_sample.cfg為zoo.cfg,然後編輯zoo.cfg:
    tickTime=2000
        initLimit=10
        syncLimit=5
        dataDir=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/dataDir/
        clientPort=2181
        server.1=0.0.0.0:2888:3888
        server.2=node2:2888:3888
        server.3=node3:2888:3888
               

這裡注意,node2、node3替換為真實伺服器的ip位址,其中在node1中,這個ip位址要設定為0.0.0.0,不然肯會出現Cannot open channel to 1 at election address /xx.xx.xx.xx:3888的錯誤,特别是在真實的雲伺服器上面很容易出現這個問題。

  • 4、參照前面3步,分别在node2,node3機器上面進行配置,在node2中cong檔案就修改為:
    server.1=node1:2888:3888
        server.2=0.0.0.0:2888:3888
        server.3=node3:2888:3888
               
  • 5、在node3中cong檔案就修改為:
    server.1=node1:2888:3888
        server.2=node2:2888:3888
        server.3=0.0.0.0:2888:3888           
  • 6、啟動,分别在3個節點執行進行啟動
    bin/zkServer.sh  start
               
  • 7、檢視狀态,可以看到3個節點随機有是1個master和2個follow
    `bin/zkServer.sh  status`
               

六、kafka叢集搭建

因為本文主要講zookeeper,是以對于kafka的搭建的就簡易的說一下。這裡提到kafka是因為下面要講到到zookeeper和kafka的SASL機制認證。

安裝kafka叢集之前,確定zookeeper服務已經正常運作。準備3台伺服器,例如node1、node2、node3,首先進入node1中

1、下載下傳kafka,位址

http://kafka.apache.org/downloads

2、解壓到相應目錄,并在安裝目錄下建立logs目錄

3、編輯config中的server.properties檔案,注意這裡的broker.id,部署在不同機器上面的id需要不同,例如在node1中部署,這裡就填寫1,在node2機器上面就填寫2,依次類推。

broker.id=1
listeners=SASL_PLAINTEXT://0.0.0.0:9092
log.dirs=/mnt/tools/kafka/kafka_2.12/logs
num.partitions=3
zookeeper.connect=node1:2181,node2:2181,node3:2181           

4、以上設定都完畢,儲存配置并退出,然後将kafka目錄發送至其他主機

5、在各個節點上啟動

bin/kafka-server-start.sh -daemon config/server.properties
           

6、通過jps可以看到有kafka的程序以及檢視日志都正常的話就表示啟動成功了

七、zookeeper和kafka的安全認證機制SASL

zookeeper在生産環境中,如果不是隻在内網開放的話,就需要設定安全認證,可以選擇SASL的安全認證。以下是和kafka的聯合配置,如果不需要kafka可以去掉kafka相關的權限即可,以下基于zk3.5.5和kafka2.12進行操作。

下面就是詳細的部署步驟:

7.1 zookeeper的安全認證配置

zookeeper所有節點都是對等的,隻是各個節點角色可能不相同。以下步驟所有的節點配置相同。

1、導入kafka的相關jar

從kafka/lib目錄下複制以下幾個jar包到zookeeper的lib目錄下:

kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar
           

2、zoo.cfg檔案配置

添加如下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
           

3、編寫JAAS檔案,zk_server_jaas.conf,放置在conf目錄下

這個檔案定義需要連結到Zookeeper伺服器的使用者名和密碼。JAAS配置節預設為Server:

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin-2019" 
    user_kafka="kafka-2019" 
    user_producer="prod-2019";
};
           

這個檔案中定義了兩個使用者,一個是kafka,一個是producer,這些用user_配置出來的使用者都可以提供給生産者程式和消費者程式認證使用。還有兩個屬性,username和password,其中username是配置Zookeeper節點之間内部認證的使用者名,password是對應的密碼。

4、修改zkEnv.sh

在zkEnv.sh添加以下内容,路徑按你直接的實際路徑來填寫:

export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/conf/zk_server_jaas.conf "
           

5、在各個節點分别執行bin/zkServer.sh start啟動zk。如果啟動異常檢視日志排查問題。

7.2 kafka的安全認證配置

zookeeper啟動之後,就配置kafka,下面步驟的配置在所有節點上都相同。

1、在kafka的config目錄下,建立kafka_server_jaas.conf檔案,内容如下:

KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-2019"
    user_admin="admin-2019"
    user_producer="prod-2019"
    user_consumer="cons-2019";
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka-2019";
};
           

KafkaServer配置的是kafka的賬号和密碼,Client配置節主要配置了broker到Zookeeper的連結使用者名密碼,這裡要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的賬号和密碼相同。

2、配置server.properties,同樣的在config目錄下

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://node1:9092
security.inter.broker.protocol=SASL_PLAINTEXT  
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN  
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
           

這裡注意listeners配置項,将主機名部分(本例主機名是node1)替換成目前節點的主機名。其他在各個節點的配置一緻。注意,allow.everyone.if.no.acl.found這個配置項預設是false,若不配置成true,後續生産者、消費者無法正常使用Kafka。

3、在server啟動腳本JVM參數,在bin目錄下的kafka-server-start.sh中,将

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
           

這一行修改為

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/mnt/tools/kafka/kafka_2.12/config/kafka_server_jaas.conf"
           

4、配置其他節點

配置剩餘的kafka broker節點,注意server.properties的listeners配置項

5.啟動各個節點的kafka服務端,在bin目錄下執行

./kafka-server-start.sh ../config/server.properties

八、springboot中使用kafka

在springboot項目中,注意:如果在springboot1.x的版本中報錯,請更新為speingboot2.x的版本,本例使用的是2.1.1.RELEASE

1、建立sprinboot工程,添加pom依賴,設定parent為2.1.1.RELEASE,這樣就不需要在dependency中添加版本号了

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
           

2、在resource下建立檔案kafka_client_jaas.conf,内容如下:

KafkaClient {
     org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-2019";
};
           

這個使用者名和密碼要和前面kafka中使用的賬号密碼相同,這樣才能有通路權限

3、在application.yml檔案配置,注意這裡将node1,2,3替換為你真實的伺服器位址

spring:
  kafka:
    listener:
      batch-listener: true                                #是否開啟批量消費,true表示批量消費
      concurrency: 10                                      #設定消費的線程數
      poll-timeout: 1500                                  #自動送出設定,如果消息隊列中沒有消息,等待timeout毫秒後,調用poll()方法。如果隊列中有消息,立即消費消息,每次消費的消息的多少可以通過max.poll.records配置。
    template:
      default-topic: probe2
    producer:
      bootstrap-servers: node1:9092,node2,node3:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT      
    consumer: 
      bootstrap-servers: node1:9092,node2:9092,node3:9092
      group-id: group-2
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
      auto:
        offset: 
            reset: latest
      enable:
        auto: 
          commit: true
           

在這裡開啟了消費者10個線程,這裡可以根據你實際業務來調整消費者的數量

4、程式啟動類:

@SpringBootApplication
public class ProbeApplication {
    
    //初始化系統屬性
    static {
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        System.setProperty("java.security.auth.login.config", 
                loader.getResource("").getPath()+File.separator+"kafka_client_jaas.conf");
    }

    public static void main(String[] args) {
         SpringApplication.run(ProbeApplication.class, args);
    }
}
           

5、生産者:

@Component
public class Sender {
    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
    @Autowired
    private KafkaTemplate<String, String> template;
    
    public void send(String message) {
        ListenableFuture<SendResult<String, String>> future = this.template.sendDefault(message);
        future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!消息内容是:"+message),
                fail -> LOG.error("KafkaMessageProducer 發送消息失敗!消息内容是:"+message));
    }
}
           

6、消費者,将生産者發過來的消息進行處理

@Component
public class Receiver {
    
    @KafkaListener(topics = "probe2")
    public void receiveMessage(ConsumerRecord<String, String> record) {
            System.out.println("【*** 消費者開始接收消息 ***】key = " + record.key() + "、value = " + record.value());
           //TODO,在這裡進行自己的業務操作,例如入庫
    }
}
           

7、controller

@RestController
public class KafkaController {
    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}")
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}
           

8、啟動springboot工程,然後通路相應的位址即可得到想要的結果

如果生産者的速度大于消費者的速度,可以适當調整生産者和消費者的數量來處理,同時不要在消費者進行太過于耗時的操作。

九、總結

本文主要分享了zookeeper的應用場景和節點特性、注冊原理、zookeeper叢集搭建和kafka叢集搭建、zookeeper和kafka的SASL認證機制、在springboot中實操基于SASL認證的kafka。