一、什麼是zookeeper?
ZooKeeper是一個集中的服務,用于維護配置資訊、命名、提供分布式同步以及提供組服務。所有這些類型的服務都以某種形式被分布式應用程式使用。每次它們被實作時,都有大量的工作需要去修複不可避免的bug和競争條件。由于實作這類服務的困難,應用程式最初通常會略過它們,這使得它們在出現變化時變得脆弱,難以管理。即使做得正确,這些服務的不同實作在部署應用程式時也會導緻管理複雜性。
總之:zookeeper是一種高可靠的分布式協調元件,主要用來解決分布式一緻性和分布式鎖、中繼資料/配置資訊管理的問題。
二、zookeeper的使用場景
- 分布式協調:這個其實是zk很經典的一個用法,簡單來說,就好比,你A系統發送個請求到mq,然後B消息消費之後處理了。那A系統如何知道B系統的處理結果?用zk就可以實作分布式系統之間的協調工作。A系統發送請求之後可以在zk上對某個節點的值注冊個監聽器,一旦B系統處理完了就修改zk那個節點的值,A立馬就可以收到通知,完美解決。
zookeeper和kafka的SASL認證以及生産實踐 - 分布式鎖:對某一個資料連續發出兩個修改操作,兩台機器同時收到了請求,但是隻能一台機器先執行另外一個機器再執行。那麼此時就可以使用zk分布式鎖,一個機器接收到了請求之後先擷取zk上的一把分布式鎖,就是可以去建立一個znode,接着執行操作;然後另外一個機器也嘗試去建立那個znode,結果發現自己建立不了,因為被别人建立了,那隻能等着,等第一個機器執行完了自己再執行。
zookeeper和kafka的SASL認證以及生産實踐 - 中繼資料/配置資訊管理:zk可以用作很多系統的配置資訊的管理,比如kafka、storm、Doubbo等等很多分布式系統都會選用zk來做一些中繼資料、配置資訊的管理
zookeeper和kafka的SASL認證以及生産實踐
三、zookeeper節點有哪些特性,什麼時候使用什麼特性的節點?
在 ZooKeeper 中,節點類型可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及有序節點(SEQUENTIAL ),具體在節點建立過程中,一般是組合使用。
- 持久節點:是指在節點建立後,就一直存在,直到有删除操作來主動清除這個節點——不會因為建立該節點的用戶端會話失效而消失。
- 持久順序節點:這類節點的基本特性和上面的節點類型是一緻的。額外的特性是,在ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點建立的先後順序。基于這個特性,在建立子節點的時候,可以設定這個屬性,那麼在建立節點過程中,ZK會自動為給定節點名加上一個數字字尾,作為新的節點名。這個數字字尾的範圍是整型的最大值。
- 臨時節點:與持久節點不同的是,臨時節點的生命周期和用戶端會話綁定。也就是說,如果用戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連接配接斷開。另外,在臨時節點下面不能建立子節點。
- 事件監聽:在讀取資料時,我們可以同時對節點設定事件監聽,當節點資料或結構變化時,zookeeper會通知用戶端。目前zookeeper有如下四種事件:1)節點建立;2)節點删除;3)節點資料修改;4)子節點變更。
四、使用zookeeper實作服務注冊中心,原理是什麼?用到了zk的哪些特性?
zookeeper可以充當一個服務系統資料庫(Service Registry),讓多個服務提供者形成一個叢集,讓服務消費者通過服務系統資料庫擷取具體的服務通路位址(ip+端口)去通路具體的服務提供者。具體來說,zookeeper就是個分布式檔案系統,每當一個服務提供者部署後都要将自己的服務注冊到zookeeper的某一路徑上,每當一個服務啟動後會将自己的服務名稱、IP等資訊注冊到zookeeper的ZNode上。
服務調用者第一次請求過來的時候會傳回叢集的IP位址的集合并注冊一個服務的監聽,然後在前端通過負載均衡算法(随機,輪詢,權重輪詢,最小連接配接數,hash)将對應的請求發到對應的裝置上,而zookeeper和服務提供者之間會存在一個心跳機制,定時發送一個資料包來檢查該伺服器的狀态,當接收不到資料包時,則會将該裝置注冊的相對應的服務進行删除,然後會push到服務調用者将對應的ip進行删除防止後續請求轉發到已經宕掉的伺服器上。
使用了資料節點持久化,事件監聽的特性,分布式一緻性特性
五、zookeeper叢集搭建
多年之前我寫了一篇簡易的zookeeper入門教程,如果是搭建僞叢集的話可以參考以上這篇文章,位址:[
https://zhupei.blog.csdn.net/article/details/51517460](
"https://zhupei.blog.csdn.net/article/details/51517460")
下面是搭建生産環境的步驟:
準備3台伺服器,例如node1、node2、node3,首先進入node1中
- 1、下載下傳zk3.5版本的, http://www.apache.org/dyn/closer.cgi/zookeeper/
- 2、 解壓到相應目錄,并在目錄下建立dataDir檔案夾
- 3、在conf目錄在做配置:先拷貝一份 zoo_sample.cfg為zoo.cfg,然後編輯zoo.cfg:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/dataDir/ clientPort=2181 server.1=0.0.0.0:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888
這裡注意,node2、node3替換為真實伺服器的ip位址,其中在node1中,這個ip位址要設定為0.0.0.0,不然肯會出現Cannot open channel to 1 at election address /xx.xx.xx.xx:3888的錯誤,特别是在真實的雲伺服器上面很容易出現這個問題。
- 4、參照前面3步,分别在node2,node3機器上面進行配置,在node2中cong檔案就修改為:
server.1=node1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=node3:2888:3888
- 5、在node3中cong檔案就修改為:
server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=0.0.0.0:2888:3888
- 6、啟動,分别在3個節點執行進行啟動
bin/zkServer.sh start
- 7、檢視狀态,可以看到3個節點随機有是1個master和2個follow
`bin/zkServer.sh status`
六、kafka叢集搭建
因為本文主要講zookeeper,是以對于kafka的搭建的就簡易的說一下。這裡提到kafka是因為下面要講到到zookeeper和kafka的SASL機制認證。
安裝kafka叢集之前,確定zookeeper服務已經正常運作。準備3台伺服器,例如node1、node2、node3,首先進入node1中
1、下載下傳kafka,位址
http://kafka.apache.org/downloads2、解壓到相應目錄,并在安裝目錄下建立logs目錄
3、編輯config中的server.properties檔案,注意這裡的broker.id,部署在不同機器上面的id需要不同,例如在node1中部署,這裡就填寫1,在node2機器上面就填寫2,依次類推。
broker.id=1
listeners=SASL_PLAINTEXT://0.0.0.0:9092
log.dirs=/mnt/tools/kafka/kafka_2.12/logs
num.partitions=3
zookeeper.connect=node1:2181,node2:2181,node3:2181
4、以上設定都完畢,儲存配置并退出,然後将kafka目錄發送至其他主機
5、在各個節點上啟動
bin/kafka-server-start.sh -daemon config/server.properties
6、通過jps可以看到有kafka的程序以及檢視日志都正常的話就表示啟動成功了
七、zookeeper和kafka的安全認證機制SASL
zookeeper在生産環境中,如果不是隻在内網開放的話,就需要設定安全認證,可以選擇SASL的安全認證。以下是和kafka的聯合配置,如果不需要kafka可以去掉kafka相關的權限即可,以下基于zk3.5.5和kafka2.12進行操作。
下面就是詳細的部署步驟:
7.1 zookeeper的安全認證配置
zookeeper所有節點都是對等的,隻是各個節點角色可能不相同。以下步驟所有的節點配置相同。
1、導入kafka的相關jar
從kafka/lib目錄下複制以下幾個jar包到zookeeper的lib目錄下:
kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar
2、zoo.cfg檔案配置
添加如下配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
3、編寫JAAS檔案,zk_server_jaas.conf,放置在conf目錄下
這個檔案定義需要連結到Zookeeper伺服器的使用者名和密碼。JAAS配置節預設為Server:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_kafka="kafka-2019"
user_producer="prod-2019";
};
這個檔案中定義了兩個使用者,一個是kafka,一個是producer,這些用user_配置出來的使用者都可以提供給生産者程式和消費者程式認證使用。還有兩個屬性,username和password,其中username是配置Zookeeper節點之間内部認證的使用者名,password是對應的密碼。
4、修改zkEnv.sh
在zkEnv.sh添加以下内容,路徑按你直接的實際路徑來填寫:
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/conf/zk_server_jaas.conf "
5、在各個節點分别執行bin/zkServer.sh start啟動zk。如果啟動異常檢視日志排查問題。
7.2 kafka的安全認證配置
zookeeper啟動之後,就配置kafka,下面步驟的配置在所有節點上都相同。
1、在kafka的config目錄下,建立kafka_server_jaas.conf檔案,内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_admin="admin-2019"
user_producer="prod-2019"
user_consumer="cons-2019";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-2019";
};
KafkaServer配置的是kafka的賬号和密碼,Client配置節主要配置了broker到Zookeeper的連結使用者名密碼,這裡要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的賬号和密碼相同。
2、配置server.properties,同樣的在config目錄下
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://node1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
這裡注意listeners配置項,将主機名部分(本例主機名是node1)替換成目前節點的主機名。其他在各個節點的配置一緻。注意,allow.everyone.if.no.acl.found這個配置項預設是false,若不配置成true,後續生産者、消費者無法正常使用Kafka。
3、在server啟動腳本JVM參數,在bin目錄下的kafka-server-start.sh中,将
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
這一行修改為
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/mnt/tools/kafka/kafka_2.12/config/kafka_server_jaas.conf"
4、配置其他節點
配置剩餘的kafka broker節點,注意server.properties的listeners配置項
5.啟動各個節點的kafka服務端,在bin目錄下執行
./kafka-server-start.sh ../config/server.properties
八、springboot中使用kafka
在springboot項目中,注意:如果在springboot1.x的版本中報錯,請更新為speingboot2.x的版本,本例使用的是2.1.1.RELEASE
1、建立sprinboot工程,添加pom依賴,設定parent為2.1.1.RELEASE,這樣就不需要在dependency中添加版本号了
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、在resource下建立檔案kafka_client_jaas.conf,内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019";
};
這個使用者名和密碼要和前面kafka中使用的賬号密碼相同,這樣才能有通路權限
3、在application.yml檔案配置,注意這裡将node1,2,3替換為你真實的伺服器位址
spring:
kafka:
listener:
batch-listener: true #是否開啟批量消費,true表示批量消費
concurrency: 10 #設定消費的線程數
poll-timeout: 1500 #自動送出設定,如果消息隊列中沒有消息,等待timeout毫秒後,調用poll()方法。如果隊列中有消息,立即消費消息,每次消費的消息的多少可以通過max.poll.records配置。
template:
default-topic: probe2
producer:
bootstrap-servers: node1:9092,node2,node3:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
consumer:
bootstrap-servers: node1:9092,node2:9092,node3:9092
group-id: group-2
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
auto:
offset:
reset: latest
enable:
auto:
commit: true
在這裡開啟了消費者10個線程,這裡可以根據你實際業務來調整消費者的數量
4、程式啟動類:
@SpringBootApplication
public class ProbeApplication {
//初始化系統屬性
static {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
System.setProperty("java.security.auth.login.config",
loader.getResource("").getPath()+File.separator+"kafka_client_jaas.conf");
}
public static void main(String[] args) {
SpringApplication.run(ProbeApplication.class, args);
}
}
5、生産者:
@Component
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> template;
public void send(String message) {
ListenableFuture<SendResult<String, String>> future = this.template.sendDefault(message);
future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!消息内容是:"+message),
fail -> LOG.error("KafkaMessageProducer 發送消息失敗!消息内容是:"+message));
}
}
6、消費者,将生産者發過來的消息進行處理
@Component
public class Receiver {
@KafkaListener(topics = "probe2")
public void receiveMessage(ConsumerRecord<String, String> record) {
System.out.println("【*** 消費者開始接收消息 ***】key = " + record.key() + "、value = " + record.value());
//TODO,在這裡進行自己的業務操作,例如入庫
}
}
7、controller
@RestController
public class KafkaController {
@Autowired
private Sender sender;
@PostMapping("/send/{msg}")
public String send(@PathVariable("msg") String msg) {
sender.send(msg);
return msg;
}
}
8、啟動springboot工程,然後通路相應的位址即可得到想要的結果
如果生産者的速度大于消費者的速度,可以适當調整生産者和消費者的數量來處理,同時不要在消費者進行太過于耗時的操作。
九、總結
本文主要分享了zookeeper的應用場景和節點特性、注冊原理、zookeeper叢集搭建和kafka叢集搭建、zookeeper和kafka的SASL認證機制、在springboot中實操基于SASL認證的kafka。